/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.scheduler;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.AbstractIdleService;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.JobLauncherFactory;
import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.apache.gobblin.runtime.listeners.EmailNotificationJobListener;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.listeners.RunOnceJobListener;
import org.apache.gobblin.scheduler.BaseGobblinJob;
import org.apache.gobblin.scheduler.PathAlterationListenerAdaptorForMonitor;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.SchedulerUtils;
import org.apache.gobblin.util.filesystem.PathAlterationListener;
import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
import org.apache.hadoop.fs.Path;
import org.quartz.CronScheduleBuilder;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.ScheduleBuilder;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobScheduler
extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(JobScheduler.class);
    public static final String JOB_SCHEDULER_KEY = "jobScheduler";
    public static final String PROPERTIES_KEY = "jobProps";
    public static final String JOB_LISTENER_KEY = "jobListener";
    public final Properties properties;
    private final SchedulerService scheduler;
    protected final ExecutorService jobExecutor;
    private final Map<String, JobListener> jobListenerMap = Maps.newHashMap();
    private final Map<String, JobKey> scheduledJobs = Maps.newHashMap();
    public final Set<String> jobConfigFileExtensions;
    public final Path jobConfigFileDirPath;
    public final PathAlterationObserverScheduler pathAlterationDetector;
    public final PathAlterationListenerAdaptorForMonitor listener;
    private final boolean waitForJobCompletion;
    private final Closer closer = Closer.create();
    private final JobSpecResolver jobSpecResolver;
    private volatile boolean cancelRequested = false;

    public JobScheduler(Properties properties, SchedulerService scheduler) throws Exception {
        this.properties = properties;
        this.scheduler = scheduler;
        this.jobExecutor = Executors.newFixedThreadPool(Integer.parseInt(properties.getProperty("jobexecutor.threadpool.size", Integer.toString(5))), ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)LOG), (Optional)Optional.of((Object)"JobScheduler-%d")));
        this.jobConfigFileExtensions = Sets.newHashSet((Iterable)Splitter.on((String)",").omitEmptyStrings().split((CharSequence)this.properties.getProperty("jobconf.extensions", "pull,job")));
        long pollingInterval = Long.parseLong(this.properties.getProperty("jobconf.monitor.interval", Long.toString(30000L)));
        this.pathAlterationDetector = new PathAlterationObserverScheduler(pollingInterval);
        this.waitForJobCompletion = Boolean.parseBoolean(this.properties.getProperty("scheduler.wait.for.job.completion", ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION));
        this.jobSpecResolver = JobSpecResolver.builder(ConfigUtils.propertiesToConfig((Properties)properties)).build();
        if (this.properties.containsKey("jobconf.fullyQualifiedPath")) {
            this.jobConfigFileDirPath = new Path(this.properties.getProperty("jobconf.fullyQualifiedPath"));
            this.listener = new PathAlterationListenerAdaptorForMonitor(this.jobConfigFileDirPath, this);
        } else {
            this.jobConfigFileDirPath = null;
            this.listener = null;
        }
    }

    protected void startUp() throws Exception {
        LOG.info("Starting the job scheduler");
        try {
            this.scheduler.awaitRunning(30L, TimeUnit.SECONDS);
        }
        catch (IllegalStateException | TimeoutException exc) {
            throw new IllegalStateException("Scheduler service is not running.");
        }
        if (this.properties.containsKey("jobconf.dir") || this.properties.containsKey("jobconf.fullyQualifiedPath")) {
            if (this.properties.containsKey("jobconf.dir") && !this.properties.containsKey("jobconf.fullyQualifiedPath")) {
                this.properties.setProperty("jobconf.fullyQualifiedPath", "file://" + this.properties.getProperty("jobconf.dir"));
            }
            this.startServices();
        }
    }

    protected void startServices() throws Exception {
        this.startGeneralJobConfigFileMonitor();
        this.scheduleGeneralConfiguredJobs();
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping the job scheduler");
        this.closer.close();
        this.cancelRequested = true;
        List currentExecutions = this.scheduler.getScheduler().getCurrentlyExecutingJobs();
        for (JobExecutionContext jobExecutionContext : currentExecutions) {
            try {
                this.scheduler.getScheduler().interrupt(jobExecutionContext.getFireInstanceId());
            }
            catch (UnableToInterruptJobException e) {
                LOG.error("Failed to cancel job " + jobExecutionContext.getJobDetail().getKey(), (Throwable)e);
            }
        }
        ExecutorsUtils.shutdownExecutorService((ExecutorService)this.jobExecutor, (Optional)Optional.of((Object)LOG));
    }

    public void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException {
        try {
            this.scheduleJob(jobProps, jobListener, Maps.newHashMap(), GobblinJob.class);
        }
        catch (RuntimeException | JobException exc) {
            LOG.error("Could not schedule job " + jobProps.getProperty("job.name", "Unknown job"), (Throwable)exc);
        }
    }

    public Future<?> scheduleJobImmediately(final Properties jobProps, final JobListener jobListener, final JobLauncher jobLauncher) {
        Callable<Void> callable = new Callable<Void>(){

            @Override
            public Void call() throws JobException {
                try {
                    JobScheduler.this.runJob(jobProps, jobListener, jobLauncher);
                }
                catch (JobException je) {
                    LOG.error("Failed to run job " + jobProps.getProperty("job.name"), (Throwable)je);
                    throw je;
                }
                return null;
            }
        };
        final Future<Void> future = this.jobExecutor.submit(callable);
        return new Future(){

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (!JobScheduler.this.cancelRequested) {
                    return false;
                }
                boolean result = true;
                try {
                    jobLauncher.cancelJob(jobListener);
                }
                catch (JobException e) {
                    LOG.error("Failed to cancel job " + jobProps.getProperty("job.name"), (Throwable)e);
                    result = false;
                }
                if (mayInterruptIfRunning) {
                    result &= future.cancel(true);
                }
                return result;
            }

            @Override
            public boolean isCancelled() {
                return future.isCancelled();
            }

            @Override
            public boolean isDone() {
                return future.isDone();
            }

            public Object get() throws InterruptedException, ExecutionException {
                return future.get();
            }

            public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return future.get(timeout, unit);
            }
        };
    }

    public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) throws JobException {
        try {
            return this.scheduleJobImmediately(jobProps, jobListener, this.buildJobLauncher(jobProps));
        }
        catch (Exception e) {
            throw new JobException("Job " + jobProps.getProperty("job.name") + " cannot be immediately scheduled.", e);
        }
    }

    public void submitRunnableToExecutor(Runnable runnable) {
        this.jobExecutor.execute(runnable);
    }

    public void scheduleJob(Properties jobProps, JobListener jobListener, Map<String, Object> additionalJobData, Class<? extends Job> jobClass) throws JobException {
        boolean disabled;
        Preconditions.checkArgument((boolean)jobProps.containsKey("job.name"), (Object)"A job must have a job name specified by job.name");
        String jobName = jobProps.getProperty("job.name");
        if (this.scheduledJobs.containsKey(jobName)) {
            LOG.info("Job " + jobName + " was already scheduled, un-scheduling it now.");
            this.unscheduleJob(jobName);
        }
        if (disabled = Boolean.valueOf(jobProps.getProperty("job.disabled", "false")).booleanValue()) {
            LOG.info("Skipping disabled job " + jobName);
            return;
        }
        if (!jobProps.containsKey("job.schedule")) {
            this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, jobListener));
            return;
        }
        if (jobListener != null) {
            this.jobListenerMap.put(jobName, jobListener);
        }
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put(JOB_SCHEDULER_KEY, (Object)this);
        jobDataMap.put(PROPERTIES_KEY, (Object)jobProps);
        jobDataMap.put(JOB_LISTENER_KEY, (Object)jobListener);
        jobDataMap.putAll(additionalJobData);
        JobDetail job = JobBuilder.newJob(jobClass).withIdentity(jobName, Strings.nullToEmpty((String)jobProps.getProperty("job.group"))).withDescription(Strings.nullToEmpty((String)jobProps.getProperty("job.description"))).usingJobData(jobDataMap).build();
        try {
            Trigger trigger = this.getTrigger(job.getKey(), jobProps);
            this.scheduler.getScheduler().scheduleJob(job, trigger);
            LOG.info(String.format("Scheduled job %s. Next run: %s.", job.getKey(), trigger.getNextFireTime()));
        }
        catch (SchedulerException se) {
            LOG.error("Failed to schedule job " + jobName, (Throwable)se);
            throw new JobException("Failed to schedule job " + jobName, se);
        }
        this.scheduledJobs.put(jobName, job.getKey());
    }

    public void unscheduleJob(String jobName) throws JobException {
        if (this.scheduledJobs.containsKey(jobName)) {
            try {
                this.scheduler.getScheduler().deleteJob(this.scheduledJobs.remove(jobName));
            }
            catch (SchedulerException se) {
                LOG.error("Failed to unschedule and delete job " + jobName, (Throwable)se);
                throw new JobException("Failed to unschedule and delete job " + jobName, se);
            }
        }
    }

    public void unscheduleAllJobs() throws SchedulerException {
        this.scheduler.getScheduler().clear();
    }

    public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
        try {
            this.runJob(jobProps, jobListener, this.buildJobLauncher(jobProps));
        }
        catch (Exception e) {
            throw new JobException("Failed to run job " + jobProps.getProperty("job.name"), e);
        }
    }

    public JobLauncher buildJobLauncher(Properties jobProps) throws Exception {
        return JobLauncherFactory.newJobLauncher(this.properties, jobProps);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean runJob(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher) throws JobException {
        Preconditions.checkArgument((boolean)jobProps.containsKey("job.name"), (Object)"A job must have a job name specified by job.name");
        String jobName = jobProps.getProperty("job.name");
        boolean disabled = Boolean.valueOf(jobProps.getProperty("job.disabled", "false"));
        if (disabled) {
            LOG.info("Skipping disabled job " + jobName);
            return false;
        }
        try (Closer closer = Closer.create();){
            ((JobLauncher)closer.register((Closeable)jobLauncher)).launchJob(jobListener);
            boolean runOnce = Boolean.valueOf(jobProps.getProperty("job.runonce", "false"));
            boolean isEarlyStopped = jobLauncher.isEarlyStopped();
            if (!isEarlyStopped && runOnce && this.scheduledJobs.containsKey(jobName)) {
                this.scheduler.getScheduler().deleteJob(this.scheduledJobs.remove(jobName));
            }
            boolean bl = isEarlyStopped;
            return bl;
        }
        catch (Throwable t) {
            throw new JobException("Failed to launch and run job " + jobName, t);
        }
    }

    public Collection<String> getScheduledJobs() {
        return this.scheduledJobs.keySet();
    }

    private void scheduleGeneralConfiguredJobs() throws ConfigurationException, JobException, IOException {
        LOG.info("Scheduling configured jobs");
        for (Properties jobProps : this.loadGeneralJobConfigs()) {
            boolean runOnce;
            if (!jobProps.containsKey("job.schedule")) {
                jobProps.setProperty("job.runonce", "true");
            }
            this.scheduleJob(jobProps, (runOnce = Boolean.valueOf(jobProps.getProperty("job.runonce", "false")).booleanValue()) ? new RunOnceJobListener() : new EmailNotificationJobListener());
            this.listener.addToJobNameMap(jobProps);
        }
    }

    private List<Properties> loadGeneralJobConfigs() throws ConfigurationException, IOException {
        List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(this.properties, this.jobSpecResolver);
        LOG.info(String.format("Loaded %d job configurations", jobConfigs.size()));
        return jobConfigs;
    }

    private void startGeneralJobConfigFileMonitor() throws Exception {
        SchedulerUtils.addPathAlterationObserver(this.pathAlterationDetector, (PathAlterationListener)this.listener, this.jobConfigFileDirPath);
        this.pathAlterationDetector.start();
        this.closer.register(new Closeable(){

            @Override
            public void close() throws IOException {
                try {
                    JobScheduler.this.pathAlterationDetector.stop(1000L);
                }
                catch (InterruptedException e) {
                    throw new IOException(e);
                }
            }
        });
    }

    private Trigger getTrigger(JobKey jobKey, Properties jobProps) {
        return TriggerBuilder.newTrigger().withIdentity(jobProps.getProperty("job.name"), Strings.nullToEmpty((String)jobProps.getProperty("job.group"))).forJob(jobKey).withSchedule((ScheduleBuilder)CronScheduleBuilder.cronSchedule((String)jobProps.getProperty("job.schedule"))).build();
    }

    public JobSpecResolver getJobSpecResolver() {
        return this.jobSpecResolver;
    }

    public boolean isCancelRequested() {
        return this.cancelRequested;
    }

    class NonScheduledJobRunner
    implements Runnable {
        private final Properties jobProps;
        private final JobListener jobListener;

        public NonScheduledJobRunner(Properties jobProps, JobListener jobListener) {
            this.jobProps = jobProps;
            this.jobListener = jobListener;
        }

        @Override
        public void run() {
            try {
                JobScheduler.this.runJob(this.jobProps, this.jobListener);
            }
            catch (JobException je) {
                LOG.error("Failed to run job " + this.jobProps.getProperty("job.name"), (Throwable)je);
            }
        }
    }

    @DisallowConcurrentExecution
    public static class GobblinJob
    extends BaseGobblinJob
    implements InterruptableJob {
        private static final Logger log = LoggerFactory.getLogger(GobblinJob.class);

        @Override
        public void executeImpl(JobExecutionContext context) throws JobExecutionException {
            LOG.info("Starting job " + context.getJobDetail().getKey());
            JobDataMap dataMap = context.getJobDetail().getJobDataMap();
            JobScheduler jobScheduler = (JobScheduler)((Object)dataMap.get((Object)JobScheduler.JOB_SCHEDULER_KEY));
            Properties jobProps = (Properties)dataMap.get((Object)JobScheduler.PROPERTIES_KEY);
            JobListener jobListener = (JobListener)dataMap.get((Object)JobScheduler.JOB_LISTENER_KEY);
            try {
                jobScheduler.runJob(jobProps, jobListener);
            }
            catch (Throwable t) {
                throw new JobExecutionException(t);
            }
        }

        public void interrupt() throws UnableToInterruptJobException {
            log.info("Job was interrupted");
        }
    }

    public static enum Action {
        SCHEDULE,
        RESCHEDULE,
        UNSCHEDULE;

    }
}

