/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime;

import com.google.common.base.CharMatcher;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.nameParser.DatasetUrnStateStoreNameParser;
import org.apache.gobblin.metastore.nameParser.SimpleDatasetUrnStateStoreNameParser;
import org.apache.gobblin.metastore.predicates.StateStorePredicate;
import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.WritableShimSerialization;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.gobblin.util.filters.HiddenFilter;
import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FsDatasetStateStore
extends FsStateStore<JobState.DatasetState>
implements DatasetStateStore<JobState.DatasetState> {
    private static final Logger LOGGER = LoggerFactory.getLogger(FsDatasetStateStore.class);
    private int threadPoolOfGettingDatasetState;
    private static final long CACHE_SIZE = 100L;
    private LoadingCache<Path, DatasetUrnStateStoreNameParser> stateStoreNameParserLoadingCache;

    protected static DatasetStateStore<JobState.DatasetState> createStateStore(Config config, String className) {
        Configuration conf = new Configuration();
        for (Map.Entry entry : config.entrySet()) {
            conf.set((String)entry.getKey(), ((ConfigValue)entry.getValue()).unwrapped().toString());
        }
        try {
            String stateStoreFsUri = ConfigUtils.getString((Config)config, (String)"state.store.fs.uri", (String)"file:///");
            final FileSystem stateStoreFs = FileSystem.get((URI)URI.create(stateStoreFsUri), (Configuration)conf);
            String stateStoreRootDir = config.getString("state.store.dir");
            Integer threadPoolOfGettingDatasetState = ConfigUtils.getInt((Config)config, (String)"state.store.threadpoolSizeOfListingFsDatasetStateStore", (Integer)10);
            final String datasetUrnStateStoreNameParserClass = ConfigUtils.getString((Config)config, (String)"state.store.datasetUrnStateStoreNameParser", (String)SimpleDatasetUrnStateStoreNameParser.class.getName());
            LoadingCache stateStoreNameParserLoadingCache = CacheBuilder.newBuilder().maximumSize(100L).build((CacheLoader)new CacheLoader<Path, DatasetUrnStateStoreNameParser>(){

                public DatasetUrnStateStoreNameParser load(Path stateStoreDirWithStoreName) throws Exception {
                    return (DatasetUrnStateStoreNameParser)GobblinConstructorUtils.invokeLongestConstructor(Class.forName(datasetUrnStateStoreNameParserClass), (Object[])new Object[]{stateStoreFs, stateStoreDirWithStoreName});
                }
            });
            return (DatasetStateStore)GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className), (Object[])new Object[]{stateStoreFs, stateStoreRootDir, threadPoolOfGettingDatasetState, stateStoreNameParserLoadingCache});
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        catch (ReflectiveOperationException e) {
            throw new RuntimeException("Failed to instantiate " + className, e);
        }
    }

    public FsDatasetStateStore(String fsUri, String storeRootDir) throws IOException {
        super(fsUri, storeRootDir, JobState.DatasetState.class);
        this.useTmpFileForPut = false;
        this.threadPoolOfGettingDatasetState = 10;
    }

    public FsDatasetStateStore(FileSystem fs, String storeRootDir, Integer threadPoolSize, LoadingCache<Path, DatasetUrnStateStoreNameParser> stateStoreNameParserLoadingCache) {
        super(fs, storeRootDir, JobState.DatasetState.class);
        this.useTmpFileForPut = false;
        this.threadPoolOfGettingDatasetState = threadPoolSize;
        this.stateStoreNameParserLoadingCache = stateStoreNameParserLoadingCache;
    }

    public FsDatasetStateStore(FileSystem fs, String storeRootDir, Integer threadPoolSize) {
        this(fs, storeRootDir, threadPoolSize, null);
    }

    public FsDatasetStateStore(FileSystem fs, String storeRootDir) {
        this(fs, storeRootDir, 10);
    }

    public FsDatasetStateStore(String storeUrl) throws IOException {
        super(storeUrl, JobState.DatasetState.class);
        this.useTmpFileForPut = false;
    }

    public String sanitizeDatasetStatestoreNameFromDatasetURN(String storeName, String datasetURN) throws IOException {
        if (this.stateStoreNameParserLoadingCache == null) {
            return datasetURN;
        }
        try {
            Path statestoreDirWithStoreName = new Path(this.storeRootDir, storeName);
            DatasetUrnStateStoreNameParser datasetUrnBasedStateStoreNameParser = (DatasetUrnStateStoreNameParser)this.stateStoreNameParserLoadingCache.get((Object)statestoreDirWithStoreName);
            return datasetUrnBasedStateStoreNameParser.getStateStoreNameFromDatasetUrn(datasetURN);
        }
        catch (ExecutionException e) {
            throw new IOException("Failed to load dataset state store name parser: " + e, e);
        }
    }

    public JobState.DatasetState get(String storeName, String tableName, String stateId) throws IOException {
        return this.getInternal(storeName, tableName, stateId, false);
    }

    /*
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public JobState.DatasetState getInternal(String storeName, String tableName, String stateId, boolean sanitizeKeyForComparison) throws IOException {
        Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
        if (!this.fs.exists(tablePath)) {
            return null;
        }
        Configuration deserializeConf = new Configuration(this.conf);
        WritableShimSerialization.addToHadoopConfiguration((Configuration)deserializeConf);
        try (SequenceFile.Reader reader = new SequenceFile.Reader(this.fs, tablePath, deserializeConf);){
            Object writable = reader.getValueClass() == JobState.class ? new JobState() : new JobState.DatasetState();
            try {
                String stringKey;
                Text key = new Text();
                do {
                    if (!reader.next((Writable)key)) return null;
                    stringKey = sanitizeKeyForComparison ? this.sanitizeDatasetStatestoreNameFromDatasetURN(storeName, key.toString()) : key.toString();
                    writable = reader.getCurrentValue(writable);
                } while (!stringKey.equals(stateId));
                if (writable instanceof JobState.DatasetState) {
                    JobState.DatasetState datasetState = (JobState.DatasetState)((Object)writable);
                    return datasetState;
                }
                JobState.DatasetState datasetState = ((JobState)((Object)writable)).newDatasetState(true);
                return datasetState;
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }
    }

    public List<JobState.DatasetState> getAll(String storeName, String tableName) throws IOException {
        ArrayList states = Lists.newArrayList();
        Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
        if (!this.fs.exists(tablePath)) {
            return states;
        }
        Configuration deserializeConfig = new Configuration(this.conf);
        WritableShimSerialization.addToHadoopConfiguration((Configuration)deserializeConfig);
        try (GobblinSequenceFileReader reader = new GobblinSequenceFileReader(this.fs, tablePath, deserializeConfig);){
            String className = reader.getValueClassName();
            if (className.startsWith("gobblin")) {
                LOGGER.warn("There's old JobState with no apache package name being read while we cast them at runtime");
                className = "org.apache." + className;
            }
            if (!className.equals(JobState.class.getName()) && !className.equals(JobState.DatasetState.class.getName())) {
                throw new RuntimeException("There is a mismatch in the Class Type of state in state-store and that in runtime");
            }
            Object writable = reader.getValueClass() == JobState.class ? new JobState() : new JobState.DatasetState();
            try {
                Text key = new Text();
                while (reader.next((Writable)key)) {
                    if ((writable = reader.getCurrentValue(writable)) instanceof JobState.DatasetState) {
                        states.add((JobState.DatasetState)((Object)writable));
                        writable = new JobState.DatasetState();
                        continue;
                    }
                    states.add(writable.newDatasetState(true));
                    writable = new JobState();
                }
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }
        return states;
    }

    public List<JobState.DatasetState> getAll(String storeName) throws IOException {
        return super.getAll(storeName);
    }

    public Map<String, JobState.DatasetState> getLatestDatasetStatesByUrns(final String jobName) throws IOException {
        Path stateStorePath = new Path(this.storeRootDir, jobName);
        if (!this.fs.exists(stateStorePath)) {
            return ImmutableMap.of();
        }
        FileStatus[] stateStoreFileStatuses = this.fs.listStatus(stateStorePath, new PathFilter(){

            public boolean accept(Path path) {
                return path.getName().endsWith("current.jst");
            }
        });
        if (stateStoreFileStatuses == null || stateStoreFileStatuses.length == 0) {
            return ImmutableMap.of();
        }
        final ConcurrentHashMap<String, JobState.DatasetState> datasetStatesByUrns = new ConcurrentHashMap<String, JobState.DatasetState>();
        Iterator callableIterator = Iterators.transform(Arrays.asList(stateStoreFileStatuses).iterator(), (Function)new Function<FileStatus, Callable<Void>>(){

            public Callable<Void> apply(final FileStatus stateStoreFileStatus) {
                return new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        Path stateStoreFilePath = stateStoreFileStatus.getPath();
                        LOGGER.info("Getting dataset states from: {}", (Object)stateStoreFilePath);
                        List<JobState.DatasetState> previousDatasetStates = FsDatasetStateStore.this.getAll(jobName, stateStoreFilePath.getName());
                        if (!previousDatasetStates.isEmpty()) {
                            JobState.DatasetState previousDatasetState = previousDatasetStates.get(0);
                            datasetStatesByUrns.put(previousDatasetState.getDatasetUrn(), previousDatasetState);
                        }
                        return null;
                    }
                };
            }
        });
        try {
            List results = new IteratorExecutor(callableIterator, this.threadPoolOfGettingDatasetState, ExecutorsUtils.newDaemonThreadFactory((Optional)Optional.of((Object)LOGGER), (Optional)Optional.of((Object)"GetFsDatasetStateStore-"))).executeAndGetResults();
            int maxNumberOfErrorLogs = 10;
            IteratorExecutor.logAndThrowFailures((List)results, (Logger)LOGGER, (int)maxNumberOfErrorLogs);
        }
        catch (InterruptedException e) {
            throw new IOException("Failed to get latest dataset states.", e);
        }
        if (datasetStatesByUrns.size() > 1) {
            datasetStatesByUrns.remove("");
        }
        return datasetStatesByUrns;
    }

    public JobState.DatasetState getLatestDatasetState(String storeName, String datasetUrn) throws IOException {
        String alias = Strings.isNullOrEmpty((String)datasetUrn) ? "current.jst" : this.sanitizeDatasetStatestoreNameFromDatasetURN(storeName, CharMatcher.is((char)':').replaceFrom((CharSequence)datasetUrn, '.')) + "-" + "current" + ".jst";
        return this.get(storeName, alias, datasetUrn);
    }

    public void persistDatasetState(String datasetUrn, JobState.DatasetState datasetState) throws IOException {
        String jobName = datasetState.getJobName();
        String jobId = datasetState.getJobId();
        datasetUrn = CharMatcher.is((char)':').replaceFrom((CharSequence)datasetUrn, '.');
        String datasetStatestoreName = this.sanitizeDatasetStatestoreNameFromDatasetURN(jobName, datasetUrn);
        String tableName = Strings.isNullOrEmpty((String)datasetUrn) ? this.sanitizeJobId(jobId) + ".jst" : datasetStatestoreName + "-" + this.sanitizeJobId(jobId) + ".jst";
        LOGGER.info("Persisting " + tableName + " to the job state store");
        this.put(jobName, tableName, (State)datasetState);
        this.createAlias(jobName, tableName, FsDatasetStateStore.getAliasName(datasetStatestoreName));
        Path originalDatasetUrnPath = new Path(new Path(this.storeRootDir, jobName), FsDatasetStateStore.getAliasName(datasetUrn));
        if (!Strings.isNullOrEmpty((String)datasetUrn) && !datasetStatestoreName.equals(datasetUrn) && this.fs.exists(originalDatasetUrnPath)) {
            LOGGER.info("Removing previous datasetUrn path: " + originalDatasetUrnPath);
            this.fs.delete(originalDatasetUrnPath, true);
        }
    }

    private String sanitizeJobId(String jobId) {
        return jobId.replaceAll("[-/]", "_");
    }

    public void persistDatasetURNs(String storeName, Collection<String> datasetUrns) throws IOException {
        if (this.stateStoreNameParserLoadingCache == null) {
            return;
        }
        try {
            ((DatasetUrnStateStoreNameParser)this.stateStoreNameParserLoadingCache.get((Object)new Path(this.storeRootDir, storeName))).persistDatasetUrns(datasetUrns);
        }
        catch (ExecutionException e) {
            throw new IOException("Failed to persist datasetUrns.", e);
        }
    }

    private static String getAliasName(String datasetStatestoreName) {
        return Strings.isNullOrEmpty((String)datasetStatestoreName) ? "current.jst" : datasetStatestoreName + "-" + "current" + ".jst";
    }

    public List<FsDatasetStateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate) throws IOException {
        Stream<Path> stores;
        Stream<Path> stream = stores = predicate instanceof StoreNamePredicate ? Stream.of(new Path(this.storeRootDir, ((StoreNamePredicate)predicate).getStoreName())) : this.lsStream(new Path(this.storeRootDir)).map(FileStatus::getPath);
        if (stores == null) {
            return Lists.newArrayList();
        }
        Stream<FsDatasetStateStoreEntryManager> tables = stores.flatMap(this::lsStream);
        return tables.map(this::parseMetadataFromPath).filter(arg_0 -> ((StateStorePredicate)predicate).apply(arg_0)).collect(Collectors.toList());
    }

    private Stream<FileStatus> lsStream(Path path) {
        try {
            FileStatus[] ls = this.fs.listStatus(path, (PathFilter)new HiddenFilter());
            return ls == null ? Stream.empty() : Arrays.stream(ls);
        }
        catch (IOException ioe) {
            return Stream.empty();
        }
    }

    private FsDatasetStateStoreEntryManager parseMetadataFromPath(FileStatus status) {
        return new FsDatasetStateStoreEntryManager(status, this);
    }
}

