/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.data.management.conversion.hive.publisher;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.URI;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.conversion.hive.avro.AvroSchemaManager;
import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
import org.apache.gobblin.data.management.conversion.hive.events.EventWorkunitUtils;
import org.apache.gobblin.data.management.conversion.hive.query.HiveAvroORCQueryGenerator;
import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils;
import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker;
import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarkerFactory;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.metrics.event.sla.SlaEventSubmitter;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.util.AutoReturnableObject;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.HiveJdbcConnector;
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveConvertPublisher
extends DataPublisher {
    private static final Logger log = LoggerFactory.getLogger(HiveConvertPublisher.class);
    private final AvroSchemaManager avroSchemaManager;
    private final HiveJdbcConnector hiveJdbcConnector;
    private MetricContext metricContext;
    private EventSubmitter eventSubmitter;
    private final FileSystem fs;
    private final HiveSourceWatermarker watermarker;
    private final HiveMetastoreClientPool pool;
    private final Optional<LineageInfo> lineageInfo;
    public static final String PARTITION_PARAMETERS_WHITELIST = "hive.conversion.partitionParameters.whitelist";
    public static final String PARTITION_PARAMETERS_BLACKLIST = "hive.conversion.partitionParameters.blacklist";
    public static final String COMPLETE_SOURCE_PARTITION_NAME = "completeSourcePartitionName";
    public static final String COMPLETE_DEST_PARTITION_NAME = "completeDestPartitionName";
    private static final Splitter COMMA_SPLITTER = Splitter.on((String)",").omitEmptyStrings().trimResults();
    private static final Splitter At_SPLITTER = Splitter.on((String)"@").omitEmptyStrings().trimResults();
    private static final Predicate<WorkUnitState> UNSUCCESSFUL_WORKUNIT = new Predicate<WorkUnitState>(){

        public boolean apply(WorkUnitState input) {
            return null == input || !WorkUnitState.WorkingState.SUCCESSFUL.equals((Object)input.getWorkingState());
        }
    };
    private static final Ordering<WorkUnitState> PARTITION_PUBLISH_ORDERING = Ordering.natural().nullsLast().onResultOf((Function)new Function<WorkUnitState, String>(){

        public String apply(@Nonnull WorkUnitState wus) {
            return (String)new HiveWorkUnit(wus.getWorkunit()).getPartitionName().orNull();
        }
    });

    public HiveConvertPublisher(State state) throws IOException {
        super(state);
        this.avroSchemaManager = new AvroSchemaManager(FileSystem.get((Configuration)HadoopUtils.newConfiguration()), state);
        this.metricContext = Instrumented.getMetricContext((State)state, HiveConvertPublisher.class);
        this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "gobblin.hive.conversion").build();
        this.lineageInfo = state instanceof SourceState ? LineageInfo.getLineageInfo((SharedResourcesBroker)((SourceState)state).getBroker()) : (state instanceof WorkUnitState ? LineageInfo.getLineageInfo((SharedResourcesBroker)((WorkUnitState)state).getTaskBrokerNullable()) : Optional.absent());
        Configuration conf = new Configuration();
        Optional uri = Optional.fromNullable((Object)this.state.getProp("writer.fs.uri"));
        this.fs = uri.isPresent() ? FileSystem.get((URI)URI.create((String)uri.get()), (Configuration)conf) : FileSystem.get((Configuration)conf);
        try {
            this.hiveJdbcConnector = HiveJdbcConnector.newConnectorWithProps((Properties)state.getProperties());
        }
        catch (SQLException e) {
            throw new RuntimeException(e);
        }
        this.watermarker = ((HiveSourceWatermarkerFactory)GobblinConstructorUtils.invokeConstructor(HiveSourceWatermarkerFactory.class, (String)state.getProp("hive.source.watermarker.factoryClass", HiveSource.DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS), (Object[])new Object[0])).createFromState(state);
        this.pool = HiveMetastoreClientPool.get((Properties)state.getProperties(), (Optional)Optional.fromNullable((Object)state.getProperties().getProperty("hive.dataset.hive.metastore.uri")));
    }

    public void initialize() throws IOException {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
        block27: {
            LinkedHashSet cleanUpQueries = Sets.newLinkedHashSet();
            LinkedHashSet publishQueries = Sets.newLinkedHashSet();
            ArrayList directoriesToDelete = Lists.newArrayList();
            try {
                if (Iterables.tryFind(states, UNSUCCESSFUL_WORKUNIT).isPresent()) {
                    for (WorkUnitState workUnitState : states) {
                        QueryBasedHivePublishEntity publishEntity = HiveAvroORCQueryGenerator.deserializePublishCommands((State)workUnitState);
                        if (publishEntity.getCleanupQueries() != null) {
                            cleanUpQueries.addAll(publishEntity.getCleanupQueries());
                        }
                        if (publishEntity.getCleanupDirectories() != null) {
                            directoriesToDelete.addAll(publishEntity.getCleanupDirectories());
                        }
                        EventWorkunitUtils.setBeginPublishDDLExecuteTimeMetadata((State)workUnitState, System.currentTimeMillis());
                        workUnitState.setWorkingState(WorkUnitState.WorkingState.FAILED);
                        if (workUnitState.getPropAsBoolean("hive.source.watermark.isWatermarkWorkUnit")) continue;
                        try {
                            new SlaEventSubmitter(this.eventSubmitter, "gobblin.hive.conversion.ConversionFailed", workUnitState.getProperties()).submit();
                        }
                        catch (Exception e) {
                            log.error("Failed while emitting SLA event, but ignoring and moving forward to curate all clean up comamnds", (Throwable)e);
                        }
                    }
                    break block27;
                }
                for (WorkUnitState workUnitState : PARTITION_PUBLISH_ORDERING.sortedCopy(states)) {
                    QueryBasedHivePublishEntity publishEntity = HiveAvroORCQueryGenerator.deserializePublishCommands((State)workUnitState);
                    if (publishEntity.getCleanupQueries() != null) {
                        cleanUpQueries.addAll(publishEntity.getCleanupQueries());
                    }
                    if (publishEntity.getCleanupDirectories() != null) {
                        directoriesToDelete.addAll(publishEntity.getCleanupDirectories());
                    }
                    if (publishEntity.getPublishDirectories() != null) {
                        Map<String, String> publishDirectories = publishEntity.getPublishDirectories();
                        for (Map.Entry<String, String> publishDir : publishDirectories.entrySet()) {
                            this.moveDirectory(publishDir.getKey(), publishDir.getValue());
                        }
                    }
                    if (publishEntity.getPublishQueries() == null) continue;
                    publishQueries.addAll(publishEntity.getPublishQueries());
                }
                for (WorkUnitState workUnitState : PARTITION_PUBLISH_ORDERING.sortedCopy(states)) {
                    if (HiveAvroORCQueryGenerator.deserializePublishCommands((State)workUnitState).getPublishQueries() == null) continue;
                    EventWorkunitUtils.setBeginPublishDDLExecuteTimeMetadata((State)workUnitState, System.currentTimeMillis());
                }
                this.executeQueries(Lists.newArrayList((Iterable)publishQueries));
                for (WorkUnitState workUnitState : PARTITION_PUBLISH_ORDERING.sortedCopy(states)) {
                    if (HiveAvroORCQueryGenerator.deserializePublishCommands((State)workUnitState).getPublishQueries() != null) {
                        EventWorkunitUtils.setEndPublishDDLExecuteTimeMetadata((State)workUnitState, System.currentTimeMillis());
                    }
                    workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
                    this.watermarker.setActualHighWatermark(workUnitState);
                    if (workUnitState.getPropAsBoolean("hive.source.watermark.isWatermarkWorkUnit")) continue;
                    EventWorkunitUtils.setIsFirstPublishMetadata(workUnitState);
                    try {
                        new SlaEventSubmitter(this.eventSubmitter, "gobblin.hive.conversion.ConversionSuccessful", workUnitState.getProperties()).submit();
                    }
                    catch (Exception e) {
                        log.error("Failed while emitting SLA event, but ignoring and moving forward to curate all clean up commands", (Throwable)e);
                    }
                    if (!LineageUtils.shouldSetLineageInfo(workUnitState)) continue;
                    HiveConvertPublisher.setDestLineageInfo(workUnitState, this.lineageInfo);
                }
            }
            finally {
                this.preservePartitionParams(states);
                try {
                    this.executeQueries(Lists.newArrayList((Iterable)cleanUpQueries));
                }
                catch (Exception e) {
                    log.error("Failed to cleanup staging entities in Hive metastore.", (Throwable)e);
                }
                try {
                    this.deleteDirectories(directoriesToDelete);
                }
                catch (Exception e) {
                    log.error("Failed to cleanup staging directories.", (Throwable)e);
                }
            }
        }
    }

    @VisibleForTesting
    public static void setDestLineageInfo(WorkUnitState wus, Optional<LineageInfo> lineageInfo) {
        HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(wus.getWorkunit());
        ConvertibleHiveDataset convertibleHiveDataset = (ConvertibleHiveDataset)hiveWorkUnit.getHiveDataset();
        List<DatasetDescriptor> destDatasets = convertibleHiveDataset.getDestDatasets();
        for (int i = 0; i < destDatasets.size(); ++i) {
            if (!lineageInfo.isPresent()) continue;
            ((LineageInfo)lineageInfo.get()).putDestination(destDatasets.get(i), i + 1, (State)wus);
        }
    }

    @VisibleForTesting
    public void preservePartitionParams(Collection<? extends WorkUnitState> states) {
        for (WorkUnitState workUnitState : states) {
            String completeDestPartitionName;
            if (workUnitState.getWorkingState() != WorkUnitState.WorkingState.COMMITTED || !workUnitState.contains(COMPLETE_SOURCE_PARTITION_NAME) || !workUnitState.contains(COMPLETE_DEST_PARTITION_NAME) || !workUnitState.contains(PARTITION_PARAMETERS_WHITELIST) && !workUnitState.contains(PARTITION_PARAMETERS_BLACKLIST)) continue;
            List whitelist = COMMA_SPLITTER.splitToList((CharSequence)workUnitState.getProp(PARTITION_PARAMETERS_WHITELIST, ""));
            List blacklist = COMMA_SPLITTER.splitToList((CharSequence)workUnitState.getProp(PARTITION_PARAMETERS_BLACKLIST, ""));
            String completeSourcePartitionName = workUnitState.getProp(COMPLETE_SOURCE_PARTITION_NAME);
            if (this.copyPartitionParams(completeSourcePartitionName, completeDestPartitionName = workUnitState.getProp(COMPLETE_DEST_PARTITION_NAME), whitelist, blacklist)) continue;
            log.warn("Unable to copy partition parameters from " + completeSourcePartitionName + " to " + completeDestPartitionName);
        }
    }

    @VisibleForTesting
    public boolean copyPartitionParams(String completeSourcePartitionName, String completeDestPartitionName, List<String> whitelist, List<String> blacklist) {
        Optional<Partition> sourcePartitionOptional = this.getPartitionObject(completeSourcePartitionName);
        Optional<Partition> destPartitionOptional = this.getPartitionObject(completeDestPartitionName);
        if (!sourcePartitionOptional.isPresent() || !destPartitionOptional.isPresent()) {
            return false;
        }
        Map sourceParams = ((Partition)sourcePartitionOptional.get()).getParameters();
        Map destParams = ((Partition)destPartitionOptional.get()).getParameters();
        for (Map.Entry param : sourceParams.entrySet()) {
            if (!this.matched(whitelist, blacklist, (String)param.getKey())) continue;
            destParams.put(param.getKey(), param.getValue());
        }
        ((Partition)destPartitionOptional.get()).setParameters(destParams);
        if (!this.dropPartition(completeDestPartitionName)) {
            return false;
        }
        return this.addPartition((Partition)destPartitionOptional.get(), completeDestPartitionName);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    public boolean dropPartition(String completePartitionName) {
        List partitionList = At_SPLITTER.splitToList((CharSequence)completePartitionName);
        if (partitionList.size() != 3) {
            log.warn("Invalid partition name " + completePartitionName);
            return false;
        }
        try (AutoReturnableObject client = this.pool.getClient();){
            ((IMetaStoreClient)client.get()).dropPartition((String)partitionList.get(0), (String)partitionList.get(1), (String)partitionList.get(2), false);
            boolean bl = true;
            return bl;
        }
        catch (IOException | TException e) {
            log.warn("Unable to drop Partition " + completePartitionName);
            return false;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    public boolean addPartition(Partition destPartition, String completePartitionName) {
        try (AutoReturnableObject client = this.pool.getClient();){
            ((IMetaStoreClient)client.get()).add_partition(destPartition);
            boolean bl = true;
            return bl;
        }
        catch (IOException | TException e) {
            log.warn("Unable to add Partition " + completePartitionName);
            return false;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    public Optional<Partition> getPartitionObject(String completePartitionName) {
        try (AutoReturnableObject client = this.pool.getClient();){
            List partitionList = At_SPLITTER.splitToList((CharSequence)completePartitionName);
            if (partitionList.size() != 3) {
                log.warn("Invalid partition name " + completePartitionName);
                Optional optional2 = Optional.absent();
                return optional2;
            }
            Partition sourcePartition = ((IMetaStoreClient)client.get()).getPartition((String)partitionList.get(0), (String)partitionList.get(1), (String)partitionList.get(2));
            Optional optional = Optional.fromNullable((Object)sourcePartition);
            return optional;
        }
        catch (IOException | TException e) {
            log.warn("Unable to get partition object from metastore for partition " + completePartitionName);
            return Optional.absent();
        }
    }

    @VisibleForTesting
    private boolean matched(List<String> whitelist, List<String> blacklist, String key) {
        for (String patternStr : blacklist) {
            if (!Pattern.matches(this.getRegexPatternString(patternStr), key)) continue;
            return false;
        }
        for (String patternStr : whitelist) {
            if (!Pattern.matches(this.getRegexPatternString(patternStr), key)) continue;
            return true;
        }
        return false;
    }

    @VisibleForTesting
    private String getRegexPatternString(String patternStr) {
        patternStr = patternStr.replace("*", ".*");
        StringBuilder builder = new StringBuilder();
        builder.append("\\b").append(patternStr).append("\\b");
        return patternStr;
    }

    private void moveDirectory(String sourceDir, String targetDir) throws IOException {
        if (this.fs.exists(new Path(targetDir))) {
            this.deleteDirectory(targetDir);
        }
        WriterUtils.mkdirsWithRecursivePermission((FileSystem)this.fs, (Path)new Path(targetDir).getParent(), (FsPermission)FsPermission.getCachePoolDefault());
        log.info("Moving directory: " + sourceDir + " to: " + targetDir);
        if (!this.fs.rename(new Path(sourceDir), new Path(targetDir))) {
            throw new IOException(String.format("Unable to move %s to %s", sourceDir, targetDir));
        }
    }

    private void deleteDirectories(List<String> directoriesToDelete) throws IOException {
        for (String directory : directoriesToDelete) {
            this.deleteDirectory(directory);
        }
    }

    private void deleteDirectory(String dirToDelete) throws IOException {
        if (StringUtils.isBlank((String)dirToDelete)) {
            return;
        }
        log.info("Going to delete existing partition data: " + dirToDelete);
        this.fs.delete(new Path(dirToDelete), true);
    }

    private void executeQueries(List<String> queries) {
        if (null == queries || queries.size() == 0) {
            return;
        }
        try {
            this.hiveJdbcConnector.executeStatements(queries.toArray(new String[queries.size()]));
        }
        catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    public void publishMetadata(Collection<? extends WorkUnitState> states) throws IOException {
    }

    public void close() throws IOException {
        this.avroSchemaManager.cleanupTempSchemas();
        this.hiveJdbcConnector.close();
    }
}

