/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.spec_store;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.typesafe.config.Config;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.api.InstrumentedSpecStore;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.util.PathUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FSSpecStore
extends InstrumentedSpecStore {
    public static final String SPECSTORE_FS_DIR_KEY = "specStore.fs.dir";
    protected final Logger log;
    protected final Config sysConfig;
    protected final FileSystem fs;
    protected final String fsSpecStoreDir;
    protected final Path fsSpecStoreDirPath;
    protected final SpecSerDe specSerDe;

    public FSSpecStore(GobblinInstanceEnvironment env, SpecSerDe specSerDe) throws IOException {
        this(env.getSysConfig().getConfig(), specSerDe, (Optional<Logger>)Optional.absent());
    }

    public FSSpecStore(Config sysConfig, SpecSerDe specSerDe) throws IOException {
        this(sysConfig, specSerDe, (Optional<Logger>)Optional.absent());
    }

    public FSSpecStore(GobblinInstanceEnvironment env, SpecSerDe specSerDe, Optional<Logger> log) throws IOException {
        this(env.getSysConfig().getConfig(), specSerDe, log);
    }

    public FSSpecStore(Config sysConfig, SpecSerDe specSerDe, Optional<Logger> log) throws IOException {
        super(sysConfig, specSerDe);
        Preconditions.checkArgument((boolean)sysConfig.hasPath(SPECSTORE_FS_DIR_KEY), (Object)"FS SpecStore path must be specified.");
        this.log = log.isPresent() ? (Logger)log.get() : LoggerFactory.getLogger(this.getClass());
        this.sysConfig = sysConfig;
        this.specSerDe = specSerDe;
        this.fsSpecStoreDir = this.sysConfig.getString(SPECSTORE_FS_DIR_KEY);
        this.fsSpecStoreDirPath = new Path(this.fsSpecStoreDir);
        this.log.info("FSSpecStore directory is: " + this.fsSpecStoreDir);
        try {
            this.fs = this.fsSpecStoreDirPath.getFileSystem(new Configuration());
        }
        catch (IOException e) {
            throw new RuntimeException("Unable to detect job config directory file system: " + e, e);
        }
        if (!this.fs.exists(this.fsSpecStoreDirPath)) {
            this.log.info("FSSpecStore directory: " + this.fsSpecStoreDir + " did not exist. Creating it.");
            this.fs.mkdirs(this.fsSpecStoreDirPath);
        }
    }

    public static String getSpecGroup(Path specUri) {
        return specUri.getParent().getName();
    }

    public static String getSpecName(Path specUri) {
        return Files.getNameWithoutExtension((String)specUri.getName());
    }

    private Collection<Spec> getAllVersionsOfSpec(Path spec) {
        ArrayList specs = Lists.newArrayList();
        try {
            specs.add(this.readSpecFromFile(spec));
        }
        catch (IOException e) {
            this.log.warn("Spec {} not found.", (Object)spec);
        }
        return specs;
    }

    @Override
    public Collection<Spec> getAllVersionsOfSpec(URI specUri) {
        Preconditions.checkArgument((null != specUri ? 1 : 0) != 0, (Object)"Spec URI should not be null");
        Path specPath = this.getPathForURI(this.fsSpecStoreDirPath, specUri, "");
        return this.getAllVersionsOfSpec(specPath);
    }

    @Override
    public boolean existsImpl(URI specUri) throws IOException {
        Preconditions.checkArgument((null != specUri ? 1 : 0) != 0, (Object)"Spec URI should not be null");
        Path specPath = this.getPathForURI(this.fsSpecStoreDirPath, specUri, "");
        return this.fs.exists(specPath);
    }

    @Override
    public void addSpecImpl(Spec spec) throws IOException {
        Preconditions.checkArgument((null != spec ? 1 : 0) != 0, (Object)"Spec should not be null");
        this.log.info(String.format("Adding Spec with URI: %s in FSSpecStore: %s", spec.getUri(), this.fsSpecStoreDirPath));
        Path specPath = this.getPathForURI(this.fsSpecStoreDirPath, spec.getUri(), spec.getVersion());
        this.writeSpecToFile(specPath, spec);
    }

    @Override
    public boolean deleteSpec(Spec spec) throws IOException {
        Preconditions.checkArgument((null != spec ? 1 : 0) != 0, (Object)"Spec should not be null");
        return this.deleteSpec(spec.getUri(), spec.getVersion());
    }

    @Override
    public boolean deleteSpecImpl(URI specUri) throws IOException {
        Preconditions.checkArgument((null != specUri ? 1 : 0) != 0, (Object)"Spec URI should not be null");
        return this.deleteSpec(specUri, "");
    }

    @Override
    public boolean deleteSpec(URI specUri, String version) throws IOException {
        Preconditions.checkArgument((null != specUri ? 1 : 0) != 0, (Object)"Spec URI should not be null");
        Preconditions.checkArgument((null != version ? 1 : 0) != 0, (Object)"Version should not be null");
        try {
            this.log.info(String.format("Deleting Spec with URI: %s in FSSpecStore: %s", specUri, this.fsSpecStoreDirPath));
            Path specPath = this.getPathForURI(this.fsSpecStoreDirPath, specUri, version);
            return this.fs.delete(specPath, false);
        }
        catch (IOException e) {
            throw new IOException(String.format("Issue in removing Spec: %s for Version: %s", specUri, version), e);
        }
    }

    @Override
    public Spec updateSpecImpl(Spec spec) throws IOException, SpecNotFoundException {
        this.addSpec(spec);
        return spec;
    }

    @Override
    public Spec getSpecImpl(URI specUri) throws SpecNotFoundException {
        Preconditions.checkArgument((null != specUri ? 1 : 0) != 0, (Object)"Spec URI should not be null");
        Collection<Spec> specs = this.getAllVersionsOfSpec(specUri);
        Spec highestVersionSpec = null;
        for (Spec spec : specs) {
            if (null == highestVersionSpec) {
                highestVersionSpec = spec;
                continue;
            }
            if (null == spec.getVersion() || spec.getVersion().compareTo(spec.getVersion()) <= 0) continue;
            highestVersionSpec = spec;
        }
        if (null == highestVersionSpec) {
            throw new SpecNotFoundException(specUri);
        }
        return highestVersionSpec;
    }

    @Override
    public Spec getSpec(URI specUri, String version) throws IOException, SpecNotFoundException {
        Preconditions.checkArgument((null != specUri ? 1 : 0) != 0, (Object)"Spec URI should not be null");
        Preconditions.checkArgument((null != version ? 1 : 0) != 0, (Object)"Version should not be null");
        Path specPath = this.getPathForURI(this.fsSpecStoreDirPath, specUri, version);
        if (!this.fs.exists(specPath)) {
            throw new SpecNotFoundException(specUri);
        }
        return this.readSpecFromFile(specPath);
    }

    @Override
    public Collection<Spec> getSpecsImpl() throws IOException {
        ArrayList specs = Lists.newArrayList();
        try {
            this.getSpecs(this.fsSpecStoreDirPath, specs);
        }
        catch (Exception e) {
            throw new IOException(e);
        }
        return specs;
    }

    @Override
    public Iterator<URI> getSpecURIsImpl() throws IOException {
        final RemoteIterator it = this.fs.listFiles(this.fsSpecStoreDirPath, true);
        return new Iterator<URI>(){

            @Override
            public boolean hasNext() {
                try {
                    return it.hasNext();
                }
                catch (IOException ioe) {
                    throw new RuntimeException("Failed to determine if there's next element available due to:", ioe);
                }
            }

            @Override
            public URI next() {
                try {
                    return FSSpecStore.this.getURIFromPath(((LocatedFileStatus)it.next()).getPath(), FSSpecStore.this.fsSpecStoreDirPath);
                }
                catch (IOException ioe) {
                    throw new RuntimeException("Failed to fetch next element due to:", ioe);
                }
            }
        };
    }

    @Override
    public Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException {
        throw new UnsupportedOperationException("Loading specs with tag is not supported in FS-Implementation of SpecStore");
    }

    @Override
    public Optional<URI> getSpecStoreURI() {
        return Optional.of((Object)this.fsSpecStoreDirPath.toUri());
    }

    private void getSpecs(Path directory, Collection<Spec> specs) throws Exception {
        FileStatus[] fileStatuses;
        for (FileStatus fileStatus : fileStatuses = this.fs.listStatus(directory)) {
            if (fileStatus.isDirectory()) {
                this.getSpecs(fileStatus.getPath(), specs);
                continue;
            }
            try {
                specs.add(this.readSpecFromFile(fileStatus.getPath()));
            }
            catch (Exception e) {
                this.log.warn(String.format("Path[%s] cannot be correctly deserialized as Spec", fileStatus.getPath()), (Throwable)e);
            }
        }
    }

    protected Spec readSpecFromFile(Path path) throws IOException {
        Spec spec;
        try (FSDataInputStream fis = this.fs.open(path);){
            spec = this.specSerDe.deserialize(ByteStreams.toByteArray((InputStream)fis));
        }
        return spec;
    }

    protected void writeSpecToFile(Path specPath, Spec spec) throws IOException {
        byte[] serializedSpec = this.specSerDe.serialize(spec);
        try (FSDataOutputStream os = this.fs.create(specPath, true);){
            os.write(serializedSpec);
        }
    }

    protected Path getPathForURI(Path fsSpecStoreDirPath, URI uri, String version) {
        return PathUtils.addExtension((Path)PathUtils.mergePaths((Path)fsSpecStoreDirPath, (Path)new Path(uri)), (String[])new String[]{version});
    }

    protected URI getURIFromPath(Path fsPath, Path fsSpecStoreDirPath) {
        return PathUtils.relativizePath((Path)fsPath, (Path)fsSpecStoreDirPath).toUri();
    }

    @Override
    public int getSizeImpl() throws IOException {
        return this.getSizeImpl(this.fsSpecStoreDirPath);
    }

    @Override
    public Collection<Spec> getSpecsPaginatedImpl(int startOffset, int batchSize) throws IOException {
        if (startOffset < 0 || batchSize < 0) {
            throw new IllegalArgumentException(String.format("Received negative offset or batch size value when they should be >= 0. Offset is %s and batch size is %s", startOffset, batchSize));
        }
        Iterator<URI> uriIterator = this.getSpecURIsImpl();
        ArrayList<URI> sortedUris = new ArrayList<URI>();
        while (uriIterator.hasNext()) {
            sortedUris.add(uriIterator.next());
        }
        sortedUris.sort(URI::compareTo);
        int numElements = 0;
        ArrayList<Spec> batchOfSpecs = new ArrayList<Spec>();
        while (startOffset + numElements < sortedUris.size() && numElements < batchSize) {
            URI currentURI = (URI)sortedUris.get(startOffset + numElements);
            try {
                batchOfSpecs.add(this.getSpecImpl(currentURI));
            }
            catch (SpecNotFoundException e) {
                this.log.warn("Unable to find spec for uri {} so proceeding to next URI. Stacktrace {}", (Object)currentURI, (Object)e);
                continue;
            }
            ++numElements;
        }
        return batchOfSpecs;
    }

    private int getSizeImpl(Path directory) throws IOException {
        FileStatus[] fileStatuses;
        int specs = 0;
        for (FileStatus fileStatus : fileStatuses = this.fs.listStatus(directory)) {
            if (fileStatus.isDirectory()) {
                specs += this.getSizeImpl(fileStatus.getPath());
                continue;
            }
            ++specs;
        }
        return specs;
    }
}

