/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.data.management.conversion.hive.watermarker;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.conversion.hive.converter.AbstractAvroToOrcConverter;
import org.apache.gobblin.data.management.conversion.hive.provider.HiveUnitUpdateProvider;
import org.apache.gobblin.data.management.conversion.hive.provider.UpdateProviderFactory;
import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker;
import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarkerFactory;
import org.apache.gobblin.data.management.conversion.hive.watermarker.MultiKeyValueLongWatermark;
import org.apache.gobblin.data.management.conversion.hive.watermarker.TableLevelWatermarker;
import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.source.extractor.Watermark;
import org.apache.gobblin.source.extractor.WatermarkInterval;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.AutoReturnableObject;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.thrift.TException;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionLevelWatermarker
implements HiveSourceWatermarker {
    private static final Logger log = LoggerFactory.getLogger(PartitionLevelWatermarker.class);
    public static final String IS_WATERMARK_WORKUNIT_KEY = "hive.source.watermark.isWatermarkWorkUnit";
    private static final Joiner PARTITION_VALUES_JOINER = Joiner.on((String)",");
    static final Predicate<WorkUnitState> WATERMARK_WORKUNIT_PREDICATE = new Predicate<WorkUnitState>(){

        public boolean apply(@Nonnull WorkUnitState input) {
            return input.contains(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY);
        }
    };
    @VisibleForTesting
    protected long leastWatermarkToPersistInState;
    protected static final int BUFFER_WATERMARK_DAYS_TO_PERSIST = 2;
    @VisibleForTesting
    protected final TableWatermarks previousWatermarks;
    @VisibleForTesting
    protected final TableWatermarks expectedHighWatermarks = new TableWatermarks();
    protected final HiveMetastoreClientPool pool;
    protected final TableLevelWatermarker tableLevelWatermarker;
    protected final HiveUnitUpdateProvider updateProvider;

    public PartitionLevelWatermarker(State state) {
        this.previousWatermarks = new TableWatermarks();
        this.tableLevelWatermarker = new TableLevelWatermarker(state);
        this.updateProvider = UpdateProviderFactory.create(state);
        try {
            this.pool = HiveMetastoreClientPool.get((Properties)state.getProperties(), (Optional)Optional.fromNullable((Object)state.getProp("hive.dataset.hive.metastore.uri")));
        }
        catch (IOException e) {
            throw new RuntimeException("Could not initialize metastore client pool", e);
        }
        int maxLookBackDays = state.getPropAsInt("hive.source.maximum.lookbackDays", 3) + 2;
        this.leastWatermarkToPersistInState = new DateTime().minusDays(maxLookBackDays).getMillis();
        if (state instanceof SourceState) {
            SourceState sourceState = (SourceState)state;
            for (Map.Entry datasetWorkUnitStates : sourceState.getPreviousWorkUnitStatesByDatasetUrns().entrySet()) {
                ArrayList watermarkWorkUnits = Lists.newArrayList((Iterable)Iterables.filter((Iterable)((Iterable)datasetWorkUnitStates.getValue()), WATERMARK_WORKUNIT_PREDICATE));
                if (watermarkWorkUnits.isEmpty()) {
                    log.info(String.format("No previous partition watermarks for table %s", datasetWorkUnitStates.getKey()));
                    continue;
                }
                if (watermarkWorkUnits.size() > 1) {
                    throw new IllegalStateException(String.format("Each table should have only 1 watermark workunit that contains watermarks for all its partitions. Found %s", watermarkWorkUnits.size()));
                }
                MultiKeyValueLongWatermark multiKeyValueLongWatermark = (MultiKeyValueLongWatermark)((WorkUnitState)watermarkWorkUnits.get(0)).getActualHighWatermark(MultiKeyValueLongWatermark.class);
                if (multiKeyValueLongWatermark != null) {
                    this.previousWatermarks.setPartitionWatermarks((String)datasetWorkUnitStates.getKey(), multiKeyValueLongWatermark.getWatermarks());
                    continue;
                }
                log.warn(String.format("Previous workunit for %s has %s set but null MultiKeyValueLongWatermark found", datasetWorkUnitStates.getKey(), IS_WATERMARK_WORKUNIT_KEY));
            }
            log.debug("Loaded partition watermarks from previous state " + this.previousWatermarks);
            for (String tableKey : this.previousWatermarks.keySet()) {
                this.expectedHighWatermarks.setPartitionWatermarks(tableKey, Maps.newHashMap(this.previousWatermarks.getPartitionWatermarks(tableKey)));
            }
        }
    }

    @Override
    public void onTableProcessBegin(Table table, long tableProcessTime) {
        Preconditions.checkNotNull((Object)table);
        if (!this.expectedHighWatermarks.hasPartitionWatermarks(PartitionLevelWatermarker.tableKey(table))) {
            this.expectedHighWatermarks.setPartitionWatermarks(PartitionLevelWatermarker.tableKey(table), Maps.newHashMap());
        }
    }

    @Override
    public void onPartitionProcessBegin(Partition partition, long partitionProcessTime, long partitionUpdateTime) {
        Preconditions.checkNotNull((Object)partition);
        Preconditions.checkNotNull((Object)partition.getTable());
        if (!this.expectedHighWatermarks.hasPartitionWatermarks(PartitionLevelWatermarker.tableKey(partition.getTable()))) {
            throw new IllegalStateException(String.format("onPartitionProcessBegin called before onTableProcessBegin for table: %s, partitions: %s", PartitionLevelWatermarker.tableKey(partition.getTable()), PartitionLevelWatermarker.partitionKey(partition)));
        }
        Collection droppedPartitions = Collections2.transform(AbstractAvroToOrcConverter.getDropPartitionsDDLInfo(partition), (Function)new Function<Map<String, String>, String>(){

            public String apply(Map<String, String> input) {
                return PARTITION_VALUES_JOINER.join(input.values());
            }
        });
        this.expectedHighWatermarks.removePartitionWatermarks(PartitionLevelWatermarker.tableKey(partition.getTable()), droppedPartitions);
        this.expectedHighWatermarks.addPartitionWatermark(PartitionLevelWatermarker.tableKey(partition.getTable()), PartitionLevelWatermarker.partitionKey(partition), partitionUpdateTime);
    }

    @Override
    public LongWatermark getPreviousHighWatermark(Table table) {
        return this.tableLevelWatermarker.getPreviousHighWatermark(table);
    }

    @Override
    public LongWatermark getPreviousHighWatermark(Partition partition) {
        if (this.previousWatermarks.hasPartitionWatermarks(PartitionLevelWatermarker.tableKey(partition.getTable())) && ((Map)this.previousWatermarks.get(PartitionLevelWatermarker.tableKey(partition.getTable()))).containsKey(PartitionLevelWatermarker.partitionKey(partition))) {
            return new LongWatermark(this.previousWatermarks.getPartitionWatermark(PartitionLevelWatermarker.tableKey(partition.getTable()), PartitionLevelWatermarker.partitionKey(partition)).longValue());
        }
        return new LongWatermark(0L);
    }

    @Override
    public void onGetWorkunitsEnd(List<WorkUnit> workunits) {
        try (AutoReturnableObject client = this.pool.getClient();){
            for (Map.Entry tableWatermark : this.expectedHighWatermarks.entrySet()) {
                String tableKey = (String)tableWatermark.getKey();
                Map partitionWatermarks = (Map)tableWatermark.getValue();
                if (!new Table(((IMetaStoreClient)client.get()).getTable(tableKey.split("@")[0], tableKey.split("@")[1])).isPartitioned()) continue;
                ImmutableMap expectedPartitionWatermarks = ImmutableMap.copyOf((Map)Maps.filterEntries((Map)partitionWatermarks, (Predicate)new Predicate<Map.Entry<String, Long>>(){

                    public boolean apply(@Nonnull Map.Entry<String, Long> input) {
                        return Long.compare(input.getValue(), PartitionLevelWatermarker.this.leastWatermarkToPersistInState) >= 0;
                    }
                }));
                WorkUnit watermarkWorkunit = WorkUnit.createEmpty();
                watermarkWorkunit.setProp(IS_WATERMARK_WORKUNIT_KEY, (Object)true);
                watermarkWorkunit.setProp("dataset.urn", (Object)tableKey);
                watermarkWorkunit.setWatermarkInterval(new WatermarkInterval((Watermark)new MultiKeyValueLongWatermark((Map)this.previousWatermarks.get(tableKey)), (Watermark)new MultiKeyValueLongWatermark((Map<String, Long>)expectedPartitionWatermarks)));
                workunits.add(watermarkWorkunit);
            }
        }
        catch (IOException | TException e) {
            Throwables.propagate((Throwable)e);
        }
    }

    @Override
    public LongWatermark getExpectedHighWatermark(Table table, long tableProcessTime) {
        return new LongWatermark(this.updateProvider.getUpdateTime(table));
    }

    @Override
    public LongWatermark getExpectedHighWatermark(Partition partition, long tableProcessTime, long partitionProcessTime) {
        return new LongWatermark(this.expectedHighWatermarks.getPartitionWatermark(PartitionLevelWatermarker.tableKey(partition.getTable()), PartitionLevelWatermarker.partitionKey(partition)).longValue());
    }

    @Override
    public void setActualHighWatermark(WorkUnitState wus) {
        if (Boolean.valueOf(wus.getPropAsBoolean(IS_WATERMARK_WORKUNIT_KEY)).booleanValue()) {
            wus.setActualHighWatermark(wus.getWorkunit().getExpectedHighWatermark(MultiKeyValueLongWatermark.class));
        } else {
            wus.setActualHighWatermark(wus.getWorkunit().getExpectedHighWatermark(LongWatermark.class));
        }
    }

    @VisibleForTesting
    public static String tableKey(Table table) {
        return table.getCompleteName();
    }

    @VisibleForTesting
    public static String partitionKey(Partition partition) {
        return PARTITION_VALUES_JOINER.join((Iterable)partition.getValues());
    }

    void setLeastWatermarkToPersistInState(long leastWatermarkToPersistInState) {
        this.leastWatermarkToPersistInState = leastWatermarkToPersistInState;
    }

    TableWatermarks getPreviousWatermarks() {
        return this.previousWatermarks;
    }

    TableWatermarks getExpectedHighWatermarks() {
        return this.expectedHighWatermarks;
    }

    public static class Factory
    implements HiveSourceWatermarkerFactory {
        @Override
        public PartitionLevelWatermarker createFromState(State state) {
            return new PartitionLevelWatermarker(state);
        }
    }

    @VisibleForTesting
    static class TableWatermarks
    extends ConcurrentHashMap<String, Map<String, Long>> {
        private static final long serialVersionUID = 1L;

        void setPartitionWatermarks(String tableKey, Map<String, Long> partitionWatermarks) {
            this.put(tableKey, partitionWatermarks);
        }

        boolean hasPartitionWatermarks(String tableKey) {
            return this.containsKey(tableKey);
        }

        void removePartitionWatermarks(String tableKey, Collection<String> partitionKeys) {
            ((Map)this.get(tableKey)).keySet().removeAll(partitionKeys);
        }

        void addPartitionWatermark(String tableKey, String partitionKey, Long watermark) {
            ((Map)this.get(tableKey)).put(partitionKey, watermark);
        }

        Long getPartitionWatermark(String tableKey, String partitionKey) {
            return (Long)((Map)this.get(tableKey)).get(partitionKey);
        }

        Map<String, Long> getPartitionWatermarks(String tableKey) {
            return (Map)this.get(tableKey);
        }
    }
}

