/*
 * Decompiled with CFR 0.152.
 */
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.Clock;
import com.github.kagkarlsson.scheduler.CurrentlyExecuting;
import com.github.kagkarlsson.scheduler.ExecutorUtils;
import com.github.kagkarlsson.scheduler.JdbcTaskRepository;
import com.github.kagkarlsson.scheduler.SchedulerClient;
import com.github.kagkarlsson.scheduler.SchedulerName;
import com.github.kagkarlsson.scheduler.SchedulerState;
import com.github.kagkarlsson.scheduler.StatsRegistry;
import com.github.kagkarlsson.scheduler.SystemClock;
import com.github.kagkarlsson.scheduler.TaskRepository;
import com.github.kagkarlsson.scheduler.TaskResolver;
import com.github.kagkarlsson.scheduler.Waiter;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.ExecutionComplete;
import com.github.kagkarlsson.scheduler.task.ExecutionContext;
import com.github.kagkarlsson.scheduler.task.ExecutionOperations;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Scheduler
implements SchedulerClient {
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    public static final Duration SHUTDOWN_WAIT = Duration.ofMinutes(30L);
    private final Clock clock;
    private final TaskRepository taskRepository;
    private final ExecutorService executorService;
    private final Waiter waiter;
    private final List<OnStartup> onStartup;
    private final Waiter detectDeadWaiter;
    private final Duration heartbeatInterval;
    private final StatsRegistry statsRegistry;
    private final ExecutorService dueExecutor;
    private final ExecutorService detectDeadExecutor;
    private final ExecutorService updateHeartbeatExecutor;
    private final Map<Execution, CurrentlyExecuting> currentlyProcessing = Collections.synchronizedMap(new HashMap());
    private final Waiter heartbeatWaiter;
    private final SchedulerState.SettableSchedulerState schedulerState = new SchedulerState.SettableSchedulerState();
    final Semaphore executorsSemaphore;

    Scheduler(Clock clock, TaskRepository taskRepository, int maxThreads, SchedulerName schedulerName, Waiter waiter, Duration updateHeartbeatWaiter, StatsRegistry statsRegistry, List<OnStartup> onStartup) {
        this(clock, taskRepository, maxThreads, Scheduler.defaultExecutorService(maxThreads, schedulerName), schedulerName, waiter, updateHeartbeatWaiter, statsRegistry, onStartup);
    }

    private static ExecutorService defaultExecutorService(int maxThreads, SchedulerName schedulerName) {
        return Executors.newFixedThreadPool(maxThreads, ExecutorUtils.defaultThreadFactoryWithPrefix(schedulerName.getName() + "-"));
    }

    Scheduler(Clock clock, TaskRepository taskRepository, int maxThreads, ExecutorService executorService, SchedulerName schedulerName, Waiter waiter, Duration heartbeatInterval, StatsRegistry statsRegistry, List<OnStartup> onStartup) {
        this.clock = clock;
        this.taskRepository = taskRepository;
        this.executorService = executorService;
        this.waiter = waiter;
        this.onStartup = onStartup;
        this.detectDeadWaiter = new Waiter(heartbeatInterval.multipliedBy(2L));
        this.heartbeatInterval = heartbeatInterval;
        this.heartbeatWaiter = new Waiter(heartbeatInterval);
        this.statsRegistry = statsRegistry;
        this.dueExecutor = Executors.newSingleThreadExecutor(ExecutorUtils.defaultThreadFactoryWithPrefix(schedulerName.getName() + "-execute-due-"));
        this.detectDeadExecutor = Executors.newSingleThreadExecutor(ExecutorUtils.defaultThreadFactoryWithPrefix(schedulerName.getName() + "-detect-dead-"));
        this.updateHeartbeatExecutor = Executors.newSingleThreadExecutor(ExecutorUtils.defaultThreadFactoryWithPrefix(schedulerName.getName() + "-update-heartbeat-"));
        this.executorsSemaphore = new Semaphore(maxThreads);
    }

    public void start() {
        LOG.info("Starting scheduler");
        this.dueExecutor.submit(new RunUntilShutdown(this::executeDue, this.waiter, this.schedulerState, this.statsRegistry));
        this.detectDeadExecutor.submit(new RunUntilShutdown(this::detectDeadExecutions, this.detectDeadWaiter, this.schedulerState, this.statsRegistry));
        this.updateHeartbeatExecutor.submit(new RunUntilShutdown(this::updateHeartbeats, this.heartbeatWaiter, this.schedulerState, this.statsRegistry));
        this.onStartup.stream().forEach(os -> os.onStartup(this));
    }

    public void stop() {
        this.schedulerState.setIsShuttingDown();
        LOG.info("Shutting down Scheduler.");
        if (!ExecutorUtils.shutdownNowAndAwaitTermination(this.dueExecutor, Duration.ofSeconds(5L))) {
            LOG.warn("Failed to shutdown due-executor properly.");
        }
        if (!ExecutorUtils.shutdownNowAndAwaitTermination(this.detectDeadExecutor, Duration.ofSeconds(5L))) {
            LOG.warn("Failed to shutdown detect-dead-executor properly.");
        }
        if (!ExecutorUtils.shutdownNowAndAwaitTermination(this.updateHeartbeatExecutor, Duration.ofSeconds(5L))) {
            LOG.warn("Failed to shutdown update-heartbeat-executor properly.");
        }
        LOG.info("Letting running executions finish. Will wait up to {}.", (Object)SHUTDOWN_WAIT);
        if (ExecutorUtils.shutdownAndAwaitTermination(this.executorService, SHUTDOWN_WAIT)) {
            LOG.info("Scheduler stopped.");
        } else {
            LOG.warn("Scheduler stopped, but some tasks did not complete. Was currently running the following executions:\n{}", (Object)new ArrayList<Execution>(this.currentlyProcessing.keySet()).stream().map(Execution::toString).collect(Collectors.joining("\n")));
        }
    }

    @Override
    public void scheduleForExecution(LocalDateTime exeecutionTime, TaskInstance taskInstance) {
        this.taskRepository.createIfNotExists(new Execution(exeecutionTime, taskInstance));
    }

    public List<CurrentlyExecuting> getCurrentlyExecuting() {
        return new ArrayList<CurrentlyExecuting>(this.currentlyProcessing.values());
    }

    public List<Execution> getFailingExecutions(Duration failingAtLeastFor) {
        return this.taskRepository.getExecutionsFailingLongerThan(failingAtLeastFor);
    }

    void executeDue() {
        if (this.executorsSemaphore.availablePermits() <= 0) {
            return;
        }
        LocalDateTime now = this.clock.now();
        List<Execution> dueExecutions = this.taskRepository.getDue(now);
        int count = 0;
        LOG.trace("Found {} taskinstances due for execution", (Object)dueExecutions.size());
        for (Execution e : dueExecutions) {
            Optional<Execution> pickedExecution;
            if (this.schedulerState.isShuttingDown()) {
                LOG.info("Scheduler has been shutdown. Skipping {} due executions.", (Object)(dueExecutions.size() - count));
                return;
            }
            try {
                pickedExecution = this.aquireExecutorAndPickExecution(e);
            }
            catch (NoAvailableExecutors ex) {
                LOG.debug("No available executors. Skipping {} due executions.", (Object)(dueExecutions.size() - count));
                return;
            }
            if (!pickedExecution.isPresent()) {
                LOG.debug("Execution picked by another scheduler. Continuing to next due execution.");
                return;
            }
            CompletableFuture.runAsync(new ExecuteTask(pickedExecution.get()), this.executorService).thenRun(() -> this.releaseExecutor((Execution)pickedExecution.get()));
            ++count;
        }
    }

    private Optional<Execution> aquireExecutorAndPickExecution(Execution execution) {
        if (this.executorsSemaphore.tryAcquire()) {
            try {
                Optional<Execution> pickedExecution = this.taskRepository.pick(execution, this.clock.now());
                if (!pickedExecution.isPresent()) {
                    this.executorsSemaphore.release();
                } else {
                    this.currentlyProcessing.put(pickedExecution.get(), new CurrentlyExecuting(pickedExecution.get(), this.clock));
                }
                return pickedExecution;
            }
            catch (Throwable t) {
                this.executorsSemaphore.release();
                throw t;
            }
        }
        throw new NoAvailableExecutors();
    }

    private void releaseExecutor(Execution execution) {
        this.executorsSemaphore.release();
        if (this.currentlyProcessing.remove(execution) == null) {
            LOG.error("Released execution was not found in collection of executions currently being processed. Should never happen.");
            this.statsRegistry.registerUnexpectedError();
        }
    }

    void detectDeadExecutions() {
        LOG.debug("Checking for dead executions.");
        LocalDateTime now = this.clock.now();
        LocalDateTime oldAgeLimit = now.minus(this.getMaxAgeBeforeConsideredDead());
        List<Execution> oldExecutions = this.taskRepository.getOldExecutions(oldAgeLimit);
        if (!oldExecutions.isEmpty()) {
            oldExecutions.stream().forEach(execution -> {
                LOG.info("Found dead execution. Delegating handling to task. Execution: " + execution);
                try {
                    execution.taskInstance.getTask().getDeadExecutionHandler().deadExecution((Execution)execution, new ExecutionOperations(this.taskRepository, (Execution)execution));
                }
                catch (Throwable e) {
                    LOG.error("Failed while handling dead execution {}. Will be tried again later.", execution, (Object)e);
                    this.statsRegistry.registerUnexpectedError();
                }
            });
        } else {
            LOG.trace("No dead executions found.");
        }
    }

    void updateHeartbeats() {
        if (this.currentlyProcessing.isEmpty()) {
            LOG.trace("No executions to update heartbeats for. Skipping.");
            return;
        }
        LOG.debug("Updating heartbeats for {} executions being processed.", (Object)this.currentlyProcessing.size());
        LocalDateTime now = this.clock.now();
        new ArrayList<Execution>(this.currentlyProcessing.keySet()).stream().forEach(execution -> {
            LOG.trace("Updating heartbeat for execution: " + execution);
            try {
                this.taskRepository.updateHeartbeat((Execution)execution, now);
            }
            catch (Throwable e) {
                LOG.error("Failed while updating heartbeat for execution {}. Will try again later.", execution, (Object)e);
                this.statsRegistry.registerUnexpectedError();
            }
        });
    }

    private Duration getMaxAgeBeforeConsideredDead() {
        return this.heartbeatInterval.multipliedBy(4L);
    }

    public static Builder create(DataSource dataSource, Task ... knownTasks) {
        return Scheduler.create(dataSource, Arrays.asList(knownTasks));
    }

    public static Builder create(DataSource dataSource, List<Task> knownTasks) {
        return new Builder(dataSource, knownTasks);
    }

    public static class NoAvailableExecutors
    extends RuntimeException {
    }

    public static class Builder {
        private final DataSource dataSource;
        private SchedulerName schedulerName = new SchedulerName.Hostname();
        private int executorThreads = 10;
        private List<Task> knownTasks = new ArrayList<Task>();
        private List<OnStartup> startTasks = new ArrayList<OnStartup>();
        private Waiter waiter = new Waiter(Duration.ofSeconds(10L));
        private StatsRegistry statsRegistry = StatsRegistry.NOOP;
        private Duration heartbeatInterval = Duration.ofMinutes(5L);

        public Builder(DataSource dataSource, List<Task> knownTasks) {
            this.dataSource = dataSource;
            this.knownTasks.addAll(knownTasks);
        }

        public <T extends Task> Builder startTasks(T ... startTasks) {
            return this.startTasks(Arrays.asList(startTasks));
        }

        public <T extends Task> Builder startTasks(List<T> startTasks) {
            this.knownTasks.addAll(startTasks);
            this.startTasks.addAll(startTasks);
            return this;
        }

        public Builder pollingInterval(Duration pollingInterval) {
            this.waiter = new Waiter(pollingInterval);
            return this;
        }

        public Builder heartbeatInterval(Duration duration) {
            this.heartbeatInterval = duration;
            return this;
        }

        public Builder threads(int numberOfThreads) {
            this.executorThreads = numberOfThreads;
            return this;
        }

        public Builder statsRegistry(StatsRegistry statsRegistry) {
            this.statsRegistry = statsRegistry;
            return this;
        }

        public Builder schedulerName(SchedulerName schedulerName) {
            this.schedulerName = schedulerName;
            return this;
        }

        public Scheduler build() {
            TaskResolver taskResolver = new TaskResolver(TaskResolver.OnCannotResolve.WARN_ON_UNRESOLVED, this.knownTasks);
            JdbcTaskRepository taskRepository = new JdbcTaskRepository(this.dataSource, taskResolver, this.schedulerName);
            return new Scheduler(new SystemClock(), taskRepository, this.executorThreads, this.schedulerName, this.waiter, this.heartbeatInterval, this.statsRegistry, this.startTasks);
        }
    }

    static class RunUntilShutdown
    implements Runnable {
        private final Runnable toRun;
        private final Waiter waitBetweenRuns;
        private final SchedulerState schedulerState;
        private final StatsRegistry statsRegistry;

        public RunUntilShutdown(Runnable toRun, Waiter waitBetweenRuns, SchedulerState schedulerState, StatsRegistry statsRegistry) {
            this.toRun = toRun;
            this.waitBetweenRuns = waitBetweenRuns;
            this.schedulerState = schedulerState;
            this.statsRegistry = statsRegistry;
        }

        @Override
        public void run() {
            while (!this.schedulerState.isShuttingDown()) {
                try {
                    this.toRun.run();
                }
                catch (Throwable e) {
                    LOG.error("Unhandled exception. Will keep running.", e);
                    this.statsRegistry.registerUnexpectedError();
                }
                try {
                    this.waitBetweenRuns.doWait();
                }
                catch (InterruptedException interruptedException) {
                    if (this.schedulerState.isShuttingDown()) {
                        LOG.debug("Thread '{}' interrupted due to shutdown.", (Object)Thread.currentThread().getName());
                        continue;
                    }
                    LOG.error("Unexpected interruption of thread. Will keep running.", (Throwable)interruptedException);
                    this.statsRegistry.registerUnexpectedError();
                }
            }
        }
    }

    private class ExecuteTask
    implements Runnable {
        private final Execution execution;

        private ExecuteTask(Execution execution) {
            this.execution = execution;
        }

        @Override
        public void run() {
            try {
                Task task = this.execution.taskInstance.getTask();
                LOG.debug("Executing " + this.execution);
                task.execute(this.execution.taskInstance, new ExecutionContext(Scheduler.this.schedulerState));
                LOG.debug("Execution done");
                this.complete(this.execution, ExecutionComplete.Result.OK);
            }
            catch (RuntimeException unhandledException) {
                LOG.warn("Unhandled exception during execution. Treating as failure.", (Throwable)unhandledException);
                this.complete(this.execution, ExecutionComplete.Result.FAILED);
            }
            catch (Throwable unhandledError) {
                LOG.error("Error during execution. Treating as failure.", unhandledError);
                this.complete(this.execution, ExecutionComplete.Result.FAILED);
            }
        }

        private void complete(Execution execution, ExecutionComplete.Result result) {
            try {
                Task task = execution.taskInstance.getTask();
                task.getCompletionHandler().complete(new ExecutionComplete(execution, Scheduler.this.clock.now(), result), new ExecutionOperations(Scheduler.this.taskRepository, execution));
            }
            catch (Throwable e) {
                Scheduler.this.statsRegistry.registerUnexpectedError();
                LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. The execution should be detected as dead in {}, and handled according to the tasks DeadExecutionHandler.", new Object[]{execution, Scheduler.this.getMaxAgeBeforeConsideredDead(), e});
            }
        }
    }
}

