/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.api;

import com.typesafe.config.Config;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecConsumer;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.filters.HiddenFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FsSpecConsumer
implements SpecConsumer<Spec> {
    private static final Logger log = LoggerFactory.getLogger(FsSpecConsumer.class);
    public static final String SPEC_PATH_KEY = "gobblin.cluster.specConsumer.path";
    private final Path specDirPath;
    private final FileSystem fs;
    private Map<URI, Path> specToPathMap = new HashMap<URI, Path>();

    public FsSpecConsumer(Config config) {
        this(null, config);
    }

    public FsSpecConsumer(@Nullable FileSystem fs, Config config) {
        this.specDirPath = new Path(config.getString(SPEC_PATH_KEY));
        try {
            FileSystem fileSystem = this.fs = fs == null ? FileSystem.get((Configuration)new Configuration()) : fs;
            if (!this.fs.exists(this.specDirPath)) {
                this.fs.mkdirs(this.specDirPath);
            }
        }
        catch (IOException e) {
            throw new RuntimeException("Unable to detect spec directory file system: " + e, e);
        }
    }

    public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
        FileStatus[] fileStatuses;
        ArrayList<ImmutablePair> specList = new ArrayList<ImmutablePair>();
        try {
            fileStatuses = this.fs.listStatus(this.specDirPath, (PathFilter)new HiddenFilter());
        }
        catch (IOException e) {
            log.error("Error when listing files at path: {}", (Object)this.specDirPath.toString(), (Object)e);
            return null;
        }
        log.info("Found {} files at path {}", (Object)fileStatuses.length, (Object)this.specDirPath.toString());
        Arrays.sort(fileStatuses, Comparator.comparingLong(FileStatus::getModificationTime));
        for (FileStatus fileStatus : fileStatuses) {
            JobSpec.Builder jobSpecBuilder;
            AvroJobSpec avroJobSpec;
            block8: {
                DataFileReader dataFileReader;
                try {
                    dataFileReader = new DataFileReader((SeekableInput)new FsInput(fileStatus.getPath(), this.fs.getConf()), (DatumReader)new SpecificDatumReader());
                }
                catch (IOException e) {
                    log.error("Error creating DataFileReader for: {}", (Object)fileStatus.getPath().toString(), (Object)e);
                    continue;
                }
                avroJobSpec = null;
                if (dataFileReader.hasNext()) {
                    avroJobSpec = (AvroJobSpec)((Object)dataFileReader.next());
                }
                if (avroJobSpec == null) continue;
                jobSpecBuilder = new JobSpec.Builder(avroJobSpec.getUri());
                Properties props = new Properties();
                props.putAll(avroJobSpec.getProperties());
                jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri()).withVersion(avroJobSpec.getVersion()).withDescription(avroJobSpec.getDescription()).withConfigAsProperties(props).withConfig(ConfigUtils.propertiesToConfig((Properties)props));
                try {
                    if (avroJobSpec.getTemplateUri().isEmpty()) break block8;
                    jobSpecBuilder.withTemplate(new URI(avroJobSpec.getTemplateUri()));
                }
                catch (URISyntaxException u) {
                    log.error("Error building a job spec: ", (Throwable)u);
                    continue;
                }
            }
            String verbName = avroJobSpec.getMetadata().get("Verb");
            SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf((String)verbName);
            JobSpec jobSpec = jobSpecBuilder.build();
            log.debug("Successfully built jobspec: {}", (Object)jobSpec.getUri().toString());
            specList.add(new ImmutablePair((Object)verb, (Object)jobSpec));
            this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
        }
        return new CompletedFuture(specList, null);
    }

    public void commit(Spec spec) throws IOException {
        Path path = this.specToPathMap.get(spec.getUri());
        if (path != null) {
            log.debug("Calling delete on path: {}", (Object)path.toString());
            this.fs.delete(path, false);
        } else {
            log.error("No path found for job: {}", (Object)spec.getUri().toString());
        }
    }
}

