/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.aws;

import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import java.beans.ConstructorProperties;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URL;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.aws.GobblinAWSUtils;
import org.apache.gobblin.cluster.JobConfigurationManager;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.SchedulerUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
public class AWSJobConfigurationManager
extends JobConfigurationManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(AWSJobConfigurationManager.class);
    private static final long DEFAULT_JOB_CONF_REFRESH_INTERVAL = 60L;
    private Optional<JobArchiveRetriever> jobArchiveRetriever;
    private Map<String, Properties> jobConfFiles = Maps.newHashMap();
    private final long refreshIntervalInSeconds;
    private final ScheduledExecutorService fetchJobConfExecutor;

    public AWSJobConfigurationManager(EventBus eventBus, Config config) {
        super(eventBus, config);
        this.refreshIntervalInSeconds = config.hasPath("gobblin.aws.job.conf.refresh.interval") ? config.getDuration("gobblin.aws.job.conf.refresh.interval", TimeUnit.SECONDS) : 60L;
        this.fetchJobConfExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)LOGGER), (Optional)Optional.of((Object)"FetchJobConfExecutor")));
    }

    private void fetchJobConfSettings() {
        this.jobConfDirPath = this.config.hasPath("gobblin.cluster.job.conf.path") ? Optional.of((Object)this.config.getString("gobblin.cluster.job.conf.path")) : Optional.absent();
        this.jobArchiveRetriever = this.getJobArchiveRetriever(this.config);
    }

    protected void startUp() throws Exception {
        LOGGER.info("Starting the " + AWSJobConfigurationManager.class.getSimpleName());
        LOGGER.info(String.format("Scheduling the job configuration refresh task with an interval of %d second(s)", this.refreshIntervalInSeconds));
        this.fetchJobConfExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    AWSJobConfigurationManager.this.fetchJobConf();
                }
                catch (IOException | ConfigurationException e) {
                    LOGGER.error("Failed to fetch job configurations", e);
                    throw new RuntimeException("Failed to fetch job configurations", e);
                }
            }
        }, 0L, this.refreshIntervalInSeconds, TimeUnit.SECONDS);
    }

    private void fetchJobConf() throws IOException, ConfigurationException {
        this.fetchJobConfSettings();
        if (this.jobArchiveRetriever.isPresent() && this.jobConfDirPath.isPresent()) {
            String zipFile = ((JobArchiveRetriever)this.jobArchiveRetriever.get()).retrieve(this.config, (String)this.jobConfDirPath.get());
            String extractedPullFilesPath = GobblinAWSUtils.appendSlash((String)this.jobConfDirPath.get()) + "files";
            LOGGER.debug("Extracting to directory: " + extractedPullFilesPath + " from zip: " + zipFile);
            this.unzipArchive(zipFile, new File(extractedPullFilesPath));
            File jobConfigDir = new File(extractedPullFilesPath);
            if (jobConfigDir.exists()) {
                LOGGER.info("Loading job configurations from " + jobConfigDir);
                Properties properties = ConfigUtils.configToProperties((Config)this.config);
                properties.setProperty("jobconf.fullyQualifiedPath", jobConfigDir.getAbsolutePath());
                List jobConfigs = SchedulerUtils.loadGenericJobConfigs((Properties)properties);
                LOGGER.info("Loaded " + jobConfigs.size() + " job configuration(s)");
                for (Properties config : jobConfigs) {
                    LOGGER.debug("Config value: " + config);
                    String jobConfigPathIdentifier = config.getProperty("job.config.path");
                    if (!this.jobConfFiles.containsKey(jobConfigPathIdentifier)) {
                        this.jobConfFiles.put(jobConfigPathIdentifier, config);
                        this.postNewJobConfigArrival(config.getProperty("job.name"), config);
                        LOGGER.info("New config arrived for job: " + jobConfigPathIdentifier);
                        continue;
                    }
                    if (!config.equals(this.jobConfFiles.get(jobConfigPathIdentifier))) {
                        this.jobConfFiles.put(jobConfigPathIdentifier, config);
                        this.postNewJobConfigArrival(config.getProperty("job.name"), config);
                        LOGGER.info("Config updated for job: " + jobConfigPathIdentifier);
                        continue;
                    }
                    LOGGER.info("Config not changed for job: " + jobConfigPathIdentifier);
                }
            } else {
                LOGGER.warn("Job configuration directory " + jobConfigDir + " not found");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unzipArchive(String file, File outputDir) throws IOException {
        try (ZipFile zipFile = new ZipFile(file);){
            Enumeration<? extends ZipEntry> entries = zipFile.entries();
            while (entries.hasMoreElements()) {
                ZipEntry entry = entries.nextElement();
                File entryDestination = new File(outputDir, entry.getName());
                if (entry.isDirectory()) {
                    if (entryDestination.mkdirs() || entryDestination.exists()) continue;
                    throw new IOException("Could not create directory: " + entryDestination + " while un-archiving zip: " + file);
                }
                if (!entryDestination.getParentFile().mkdirs() && !entryDestination.getParentFile().exists()) {
                    throw new IOException("Could not create parent directory for: " + entryDestination + " while un-archiving zip: " + file);
                }
                InputStream in = null;
                FileOutputStream out = null;
                try {
                    in = zipFile.getInputStream(entry);
                    out = new FileOutputStream(entryDestination);
                    IOUtils.copy((InputStream)in, (OutputStream)out);
                }
                finally {
                    if (null != in) {
                        IOUtils.closeQuietly((InputStream)in);
                    }
                    if (null == out) continue;
                    IOUtils.closeQuietly((OutputStream)out);
                }
            }
        }
    }

    protected void shutDown() throws Exception {
        GobblinAWSUtils.shutdownExecutorService(((Object)((Object)this)).getClass(), this.fetchJobConfExecutor, LOGGER);
    }

    private Optional<JobArchiveRetriever> getJobArchiveRetriever(Config config) {
        if (config.hasPath("gobblin.aws.job.conf.source.file.fs.uri") && config.hasPath("gobblin.aws.job.conf.source.file.path")) {
            return Optional.of((Object)new HadoopJobArchiveRetriever(config.getString("gobblin.aws.job.conf.source.file.fs.uri"), config.getString("gobblin.aws.job.conf.source.file.path")));
        }
        if (config.hasPath("gobblin.aws.job.conf.s3.uri")) {
            LOGGER.warn("GobblinAWSConfigurationKeys.JOB_CONF_S3_URI_KEY is deprecated.  Switch to GobblinAWSConfigurationKeys.JOB_CONF_SOURCE_FILE_FS_URI_KEY and GobblinAWSConfigurationKeys.JOB_CONF_SOURCE_FILE_PATH_KEY.");
            return Optional.of((Object)new LegacyJobArchiveRetriever(config.getString("gobblin.aws.job.conf.s3.uri")));
        }
        return Optional.absent();
    }

    private static final class HadoopJobArchiveRetriever
    implements JobArchiveRetriever {
        private final String fsUri;
        private final String path;

        @Override
        public String retrieve(Config config, String targetDir) throws IOException {
            URI uri = URI.create(this.fsUri);
            FileSystem fs = FileSystem.get((URI)uri, (Configuration)HadoopUtils.getConfFromState((State)ConfigUtils.configToState((Config)config)));
            Path sourceFile = new Path(this.path);
            String zipFile = GobblinAWSUtils.appendSlash(targetDir) + StringUtils.substringAfterLast((String)this.path, (String)File.separator);
            LOGGER.debug("Downloading to zip: " + zipFile + " from uri: " + sourceFile);
            fs.copyToLocalFile(sourceFile, new Path(zipFile));
            return zipFile;
        }

        @ConstructorProperties(value={"fsUri", "path"})
        public HadoopJobArchiveRetriever(String fsUri, String path) {
            this.fsUri = fsUri;
            this.path = path;
        }

        public String getFsUri() {
            return this.fsUri;
        }

        public String getPath() {
            return this.path;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof HadoopJobArchiveRetriever)) {
                return false;
            }
            HadoopJobArchiveRetriever other = (HadoopJobArchiveRetriever)o;
            String this$fsUri = this.getFsUri();
            String other$fsUri = other.getFsUri();
            if (this$fsUri == null ? other$fsUri != null : !this$fsUri.equals(other$fsUri)) {
                return false;
            }
            String this$path = this.getPath();
            String other$path = other.getPath();
            return !(this$path == null ? other$path != null : !this$path.equals(other$path));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $fsUri = this.getFsUri();
            result = result * 59 + ($fsUri == null ? 43 : $fsUri.hashCode());
            String $path = this.getPath();
            result = result * 59 + ($path == null ? 43 : $path.hashCode());
            return result;
        }

        public String toString() {
            return "AWSJobConfigurationManager.HadoopJobArchiveRetriever(fsUri=" + this.getFsUri() + ", path=" + this.getPath() + ")";
        }
    }

    private static final class LegacyJobArchiveRetriever
    implements JobArchiveRetriever {
        private final String uri;

        @Override
        public String retrieve(Config config, String targetDir) throws IOException {
            String zipFile = GobblinAWSUtils.appendSlash(targetDir) + StringUtils.substringAfterLast((String)this.uri, (String)File.separator);
            LOGGER.debug("Downloading to zip: " + zipFile + " from uri: " + this.uri);
            FileUtils.copyURLToFile((URL)new URL(this.uri), (File)new File(zipFile));
            return zipFile;
        }

        @ConstructorProperties(value={"uri"})
        public LegacyJobArchiveRetriever(String uri) {
            this.uri = uri;
        }

        public String getUri() {
            return this.uri;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof LegacyJobArchiveRetriever)) {
                return false;
            }
            LegacyJobArchiveRetriever other = (LegacyJobArchiveRetriever)o;
            String this$uri = this.getUri();
            String other$uri = other.getUri();
            return !(this$uri == null ? other$uri != null : !this$uri.equals(other$uri));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $uri = this.getUri();
            result = result * 59 + ($uri == null ? 43 : $uri.hashCode());
            return result;
        }

        public String toString() {
            return "AWSJobConfigurationManager.LegacyJobArchiveRetriever(uri=" + this.getUri() + ")";
        }
    }

    private static interface JobArchiveRetriever {
        public String retrieve(Config var1, String var2) throws IOException;
    }
}

