/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.api;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FsSpecProducer
implements SpecProducer<Spec> {
    private static final Logger log = LoggerFactory.getLogger(FsSpecProducer.class);
    private Path specConsumerPath;
    private FileSystem fs;

    public FsSpecProducer(Config config) {
        this(null, config);
    }

    public FsSpecProducer(@Nullable FileSystem fs, Config config) {
        String specConsumerDir = ConfigUtils.getString((Config)config, (String)"gobblin.cluster.specConsumer.path", (String)"");
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)specConsumerDir) ? 1 : 0) != 0, (Object)"Missing argument: gobblin.cluster.specConsumer.path");
        this.specConsumerPath = new Path(specConsumerDir);
        try {
            this.fs = fs == null ? FileSystem.get((Configuration)new Configuration()) : fs;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public Future<?> addSpec(Spec addedSpec) {
        return this.writeSpec(addedSpec, SpecExecutor.Verb.ADD);
    }

    public Future<?> updateSpec(Spec updatedSpec) {
        return this.writeSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
    }

    private Future<?> writeSpec(Spec spec, SpecExecutor.Verb verb) {
        if (spec instanceof JobSpec) {
            try {
                AvroJobSpec avroJobSpec = this.convertToAvroJobSpec((JobSpec)spec, verb);
                this.writeAvroJobSpec(avroJobSpec);
                return new CompletedFuture((Object)Boolean.TRUE, null);
            }
            catch (IOException e) {
                log.error("Exception encountered when adding Spec {}", (Object)spec);
                return new CompletedFuture((Object)Boolean.TRUE, (Throwable)e);
            }
        }
        throw new RuntimeException("Unsupported spec type " + spec.getClass());
    }

    public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
        AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString()).setMetadata((Map<String, String>)ImmutableMap.of((Object)"Verb", (Object)SpecExecutor.Verb.DELETE.name())).setProperties((Map<String, String>)Maps.fromProperties((Properties)headers)).build();
        try {
            this.writeAvroJobSpec(avroJobSpec);
            return new CompletedFuture((Object)Boolean.TRUE, null);
        }
        catch (IOException e) {
            log.error("Exception encountered when writing DELETE spec");
            return new CompletedFuture((Object)Boolean.TRUE, (Throwable)e);
        }
    }

    public Future<? extends List<Spec>> listSpecs() {
        throw new UnsupportedOperationException();
    }

    private AvroJobSpec convertToAvroJobSpec(JobSpec jobSpec, SpecExecutor.Verb verb) {
        return AvroJobSpec.newBuilder().setUri(jobSpec.getUri().toString()).setProperties((Map<String, String>)Maps.fromProperties((Properties)jobSpec.getConfigAsProperties())).setTemplateUri("FS:///").setDescription(jobSpec.getDescription()).setVersion(jobSpec.getVersion()).setMetadata((Map<String, String>)ImmutableMap.of((Object)"Verb", (Object)verb.name())).build();
    }

    private void writeAvroJobSpec(AvroJobSpec jobSpec) throws IOException {
        SpecificDatumWriter datumWriter = new SpecificDatumWriter(AvroJobSpec.SCHEMA$);
        DataFileWriter dataFileWriter = new DataFileWriter((DatumWriter)datumWriter);
        Path jobSpecPath = new Path(this.specConsumerPath, this.annotateSpecFileName(jobSpec.getUri()));
        Path tmpDir = new Path(this.specConsumerPath, UUID.randomUUID().toString());
        if (!this.fs.exists(tmpDir)) {
            this.fs.mkdirs(tmpDir);
        }
        Path tmpJobSpecPath = new Path(tmpDir, jobSpec.getUri());
        FSDataOutputStream out = this.fs.create(tmpJobSpecPath);
        dataFileWriter.create(AvroJobSpec.SCHEMA$, (OutputStream)out);
        dataFileWriter.append((Object)jobSpec);
        dataFileWriter.close();
        HadoopUtils.renamePath((FileSystem)this.fs, (Path)tmpJobSpecPath, (Path)jobSpecPath, (boolean)true);
        log.info("Deleting {}", (Object)tmpJobSpecPath.getParent().toString());
        this.fs.delete(tmpJobSpecPath.getParent(), true);
    }

    private String annotateSpecFileName(String rawName) {
        return rawName + ".avro";
    }
}

