/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.data.management.retention.dataset;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.policy.VersionSelectionPolicy;
import org.apache.gobblin.data.management.retention.dataset.ConfigurableCleanableDataset;
import org.apache.gobblin.data.management.retention.dataset.MultiVersionCleanableDatasetBase;
import org.apache.gobblin.data.management.version.FileSystemDatasetVersion;
import org.apache.gobblin.data.management.version.finder.VersionFinder;
import org.apache.gobblin.iceberg.GobblinMCEProducer;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metadata.SchemaSource;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.catalog.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CleanableIcebergDataset<T extends FileSystemDatasetVersion>
extends ConfigurableCleanableDataset<T> {
    private static final String RETENTION_INTERVAL_TIME = "retention.interval.time";
    private static final String DEFAULT_RETENTION_INTERVAL_TIME = "10000";
    protected Config config;
    protected Properties jobProps;
    Set<TableIdentifier> expiredTable;

    public CleanableIcebergDataset(FileSystem fs, Properties jobProps, Path datasetRoot, Config config, Logger log) throws IOException {
        super(fs, jobProps, datasetRoot, config, log);
        this.config = config;
        this.jobProps = jobProps;
        this.expiredTable = new HashSet<TableIdentifier>();
    }

    public CleanableIcebergDataset(FileSystem fs, Properties props, Path datasetRoot) throws IOException {
        this(fs, props, datasetRoot, LoggerFactory.getLogger(CleanableIcebergDataset.class));
    }

    public CleanableIcebergDataset(FileSystem fs, Properties props, Path datasetRoot, Logger log) throws IOException {
        this(fs, props, datasetRoot, ConfigFactory.parseProperties((Properties)props), log);
    }

    @Override
    public void clean() throws IOException {
        if (this.isDatasetBlacklisted) {
            this.log.info("Dataset blacklisted. Cleanup skipped for " + this.datasetRoot());
            return;
        }
        boolean atLeastOneFailureSeen = false;
        for (MultiVersionCleanableDatasetBase.VersionFinderAndPolicy versionFinderAndPolicy : this.getVersionFindersAndPolicies()) {
            Config retentionConfig = versionFinderAndPolicy.getConfig();
            Preconditions.checkArgument((retentionConfig != null ? 1 : 0) != 0, (Object)"Must specify retention config for iceberg dataset retention");
            VersionSelectionPolicy selectionPolicy = versionFinderAndPolicy.getVersionSelectionPolicy();
            VersionFinder versionFinder = versionFinderAndPolicy.getVersionFinder();
            if (!selectionPolicy.versionClass().isAssignableFrom(versionFinder.versionClass())) {
                throw new IOException("Incompatible dataset version classes.");
            }
            this.log.info(String.format("Cleaning dataset %s. Using version finder %s and policy %s", this, versionFinder.getClass().getName(), selectionPolicy));
            ArrayList versions = Lists.newArrayList(versionFinder.findDatasetVersions(this));
            if (versions.isEmpty()) {
                this.log.warn("No dataset version can be found. Ignoring.");
                continue;
            }
            Collections.sort(versions, Collections.reverseOrder());
            Collection deletableVersions = selectionPolicy.listSelectedVersions(versions);
            this.cleanImpl(deletableVersions, retentionConfig);
        }
        if (atLeastOneFailureSeen) {
            throw new RuntimeException(String.format("At least one failure happened while processing %s. Look for previous logs for failures", this.datasetRoot()));
        }
        try {
            Thread.sleep(Long.parseLong(this.jobProps.getProperty(RETENTION_INTERVAL_TIME, DEFAULT_RETENTION_INTERVAL_TIME)));
        }
        catch (InterruptedException e) {
            this.log.error("interrupted while sleep");
            throw new IOException(e);
        }
    }

    protected void cleanImpl(Collection<T> deletableVersions, Config retentionConfig) throws IOException {
        ArrayList deletablePrefix = new ArrayList();
        for (FileSystemDatasetVersion version : deletableVersions) {
            version.getPaths().forEach(p -> deletablePrefix.add(this.fs.makeQualified(p).toString()));
        }
        if (deletablePrefix.isEmpty()) {
            return;
        }
        Preconditions.checkArgument((boolean)retentionConfig.hasPath("hive.registration.policy"));
        Preconditions.checkArgument((boolean)retentionConfig.hasPath("hive.database.name"));
        Properties prop = new Properties();
        prop.putAll((Map<?, ?>)this.jobProps);
        State producerState = new State(prop);
        producerState.setProp("hive.registration.policy", (Object)retentionConfig.getString("hive.registration.policy"));
        producerState.setProp("old.files.hive.registration.policy", (Object)retentionConfig.getString("hive.registration.policy"));
        producerState.setProp("hive.database.name", (Object)retentionConfig.getString("hive.database.name"));
        if (retentionConfig.hasPath("additional.hive.database.names")) {
            producerState.setProp("additional.hive.database.names", (Object)retentionConfig.getString("additional.hive.database.names"));
        }
        producerState.setProp("data.publisher.dataset.dir", (Object)this.datasetURN());
        if (!this.simulate) {
            try (GobblinMCEProducer producer = GobblinMCEProducer.getGobblinMCEProducer((State)producerState);){
                producer.sendGMCE(null, null, deletablePrefix, null, OperationType.drop_files, SchemaSource.NONE);
                this.log.info("Sent gmce to delete path {} from icebergTable", (Object)deletablePrefix.stream().map(Object::toString).collect(Collectors.joining(",")));
            }
        } else {
            this.log.info("In simulate mode, going to send gmce to delete path {} from icebergTable", (Object)deletablePrefix.stream().map(Object::toString).collect(Collectors.joining(",")));
        }
    }
}

