/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.job;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractScheduledService;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.job.JobProgress;
import org.apache.gobblin.runtime.job.TaskProgress;
import org.apache.gobblin.util.ReflectivePredicateEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobInterruptionPredicate
extends AbstractScheduledService {
    private static final Logger log = LoggerFactory.getLogger(JobInterruptionPredicate.class);
    public static final String INTERRUPTION_SQL = "org.apache.gobblin.jobInterruptionPredicate.sql";
    private final String sql;
    private final ReflectivePredicateEvaluator evaluator;
    private final JobProgress jobProgress;
    private final Runnable jobInterruptionHook;

    public JobInterruptionPredicate(JobState jobState, Runnable jobInterruptionHook, boolean autoStart) {
        this(jobState, jobState.getProp(INTERRUPTION_SQL), jobInterruptionHook, autoStart);
    }

    protected JobInterruptionPredicate(JobProgress jobProgress, String predicate, Runnable jobInterruptionHook, boolean autoStart) {
        this.sql = predicate;
        ReflectivePredicateEvaluator tmpEval = null;
        if (this.sql != null) {
            try {
                tmpEval = new ReflectivePredicateEvaluator(this.sql, JobProgress.class, TaskProgress.class);
            }
            catch (SQLException exc) {
                log.warn("Job interruption predicate is invalid, will not preemptively interrupt job.", (Throwable)exc);
            }
        }
        this.evaluator = tmpEval;
        this.jobProgress = jobProgress;
        this.jobInterruptionHook = jobInterruptionHook;
        if (autoStart && this.sql != null) {
            this.startAsync();
        }
    }

    protected void runOneIteration() {
        if (this.evaluator == null) {
            this.stopAsync();
            return;
        }
        switch (this.jobProgress.getState()) {
            case PENDING: {
                return;
            }
            case RUNNING: {
                try {
                    List<Object> objects = Stream.concat(Stream.of(this.jobProgress), this.jobProgress.getTaskProgress().stream()).collect(Collectors.toList());
                    if (!this.evaluator.evaluate(objects)) break;
                    log.info("Interrupting job due to satisfied job interruption predicate. Predicate: " + this.sql);
                    this.jobInterruptionHook.run();
                    this.stopAsync();
                    break;
                }
                catch (Throwable exc) {
                    log.warn("Failed to evaluate job interruption predicate. Will not preemptively interrupt job.", exc);
                    throw Throwables.propagate((Throwable)exc);
                }
            }
            default: {
                log.info(String.format("Detected job finished with state %s. Stopping job interruption predicate.", this.jobProgress.getState()));
                this.stopAsync();
            }
        }
    }

    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedDelaySchedule((long)30L, (long)30L, (TimeUnit)TimeUnit.SECONDS);
    }
}

