/*
 * Decompiled with CFR 0.152.
 */
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.Clock;
import com.github.kagkarlsson.scheduler.CurrentlyExecuting;
import com.github.kagkarlsson.scheduler.DueExecutionsBatch;
import com.github.kagkarlsson.scheduler.ExecutorUtils;
import com.github.kagkarlsson.scheduler.RunUntilShutdown;
import com.github.kagkarlsson.scheduler.ScheduledExecution;
import com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter;
import com.github.kagkarlsson.scheduler.SchedulerBuilder;
import com.github.kagkarlsson.scheduler.SchedulerClient;
import com.github.kagkarlsson.scheduler.SchedulerClientEventListener;
import com.github.kagkarlsson.scheduler.SchedulerName;
import com.github.kagkarlsson.scheduler.SchedulerState;
import com.github.kagkarlsson.scheduler.TaskRepository;
import com.github.kagkarlsson.scheduler.TaskResolver;
import com.github.kagkarlsson.scheduler.TriggerCheckForDueExecutions;
import com.github.kagkarlsson.scheduler.Waiter;
import com.github.kagkarlsson.scheduler.concurrent.LoggingRunnable;
import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.logging.LogLevel;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.CompletionHandler;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.ExecutionComplete;
import com.github.kagkarlsson.scheduler.task.ExecutionContext;
import com.github.kagkarlsson.scheduler.task.ExecutionOperations;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Scheduler
implements SchedulerClient {
    public static final double TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO = 0.5;
    public static final String THREAD_PREFIX = "db-scheduler";
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private final SchedulerClient delegate;
    private final Clock clock;
    private final TaskRepository schedulerTaskRepository;
    private final TaskResolver taskResolver;
    private int threadpoolSize;
    private final ExecutorService executorService;
    private final Waiter executeDueWaiter;
    private final Duration deleteUnresolvedAfter;
    private final Duration shutdownMaxWait;
    protected final List<OnStartup> onStartup;
    private final Waiter detectDeadWaiter;
    private final Duration heartbeatInterval;
    private final StatsRegistry statsRegistry;
    private final int pollingLimit;
    private final ExecutorService dueExecutor;
    private final ExecutorService detectDeadExecutor;
    private final ExecutorService updateHeartbeatExecutor;
    private final Map<Execution, CurrentlyExecuting> currentlyProcessing = Collections.synchronizedMap(new HashMap());
    private final Waiter heartbeatWaiter;
    private final SchedulerState.SettableSchedulerState schedulerState = new SchedulerState.SettableSchedulerState();
    private int currentGenerationNumber = 1;
    private final ConfigurableLogger failureLogger;

    protected Scheduler(Clock clock, TaskRepository schedulerTaskRepository, TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize, ExecutorService executorService, SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval, boolean enableImmediateExecution, StatsRegistry statsRegistry, int pollingLimit, Duration deleteUnresolvedAfter, Duration shutdownMaxWait, LogLevel logLevel, boolean logStackTrace, List<OnStartup> onStartup) {
        this.clock = clock;
        this.schedulerTaskRepository = schedulerTaskRepository;
        this.taskResolver = taskResolver;
        this.threadpoolSize = threadpoolSize;
        this.executorService = executorService;
        this.executeDueWaiter = executeDueWaiter;
        this.deleteUnresolvedAfter = deleteUnresolvedAfter;
        this.shutdownMaxWait = shutdownMaxWait;
        this.onStartup = onStartup;
        this.detectDeadWaiter = new Waiter(heartbeatInterval.multipliedBy(2L), clock);
        this.heartbeatInterval = heartbeatInterval;
        this.heartbeatWaiter = new Waiter(heartbeatInterval, clock);
        this.statsRegistry = statsRegistry;
        this.pollingLimit = pollingLimit;
        this.dueExecutor = Executors.newSingleThreadExecutor(ExecutorUtils.defaultThreadFactoryWithPrefix("db-scheduler-execute-due-"));
        this.detectDeadExecutor = Executors.newSingleThreadExecutor(ExecutorUtils.defaultThreadFactoryWithPrefix("db-scheduler-detect-dead-"));
        this.updateHeartbeatExecutor = Executors.newSingleThreadExecutor(ExecutorUtils.defaultThreadFactoryWithPrefix("db-scheduler-update-heartbeat-"));
        SchedulerClientEventListener earlyExecutionListener = enableImmediateExecution ? new TriggerCheckForDueExecutions(this.schedulerState, clock, executeDueWaiter) : SchedulerClientEventListener.NOOP;
        this.delegate = new SchedulerClient.StandardSchedulerClient(clientTaskRepository, earlyExecutionListener);
        this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace);
    }

    public void start() {
        LOG.info("Starting scheduler.");
        this.executeOnStartup();
        this.dueExecutor.submit(new RunUntilShutdown(this::executeDue, this.executeDueWaiter, this.schedulerState, this.statsRegistry));
        this.detectDeadExecutor.submit(new RunUntilShutdown(this::detectDeadExecutions, this.detectDeadWaiter, this.schedulerState, this.statsRegistry));
        this.updateHeartbeatExecutor.submit(new RunUntilShutdown(this::updateHeartbeats, this.heartbeatWaiter, this.schedulerState, this.statsRegistry));
        this.schedulerState.setStarted();
    }

    protected void executeOnStartup() {
        this.onStartup.forEach(os -> {
            try {
                os.onStartup(this, this.clock);
            }
            catch (Exception e) {
                LOG.error("Unexpected error while executing OnStartup tasks. Continuing.", (Throwable)e);
                this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
            }
        });
    }

    public void stop() {
        this.stop(Duration.ofSeconds(1L), Duration.ofSeconds(5L));
    }

    void stop(Duration utilExecutorsWaitBeforeInterrupt, Duration utilExecutorsWaitAfterInterrupt) {
        if (this.schedulerState.isShuttingDown()) {
            LOG.warn("Multiple calls to 'stop()'. Scheduler is already stopping.");
            return;
        }
        this.schedulerState.setIsShuttingDown();
        LOG.info("Shutting down Scheduler.");
        if (!ExecutorUtils.shutdownAndAwaitTermination(this.dueExecutor, utilExecutorsWaitBeforeInterrupt, utilExecutorsWaitAfterInterrupt)) {
            LOG.warn("Failed to shutdown due-executor properly.");
        }
        if (!ExecutorUtils.shutdownAndAwaitTermination(this.detectDeadExecutor, utilExecutorsWaitBeforeInterrupt, utilExecutorsWaitAfterInterrupt)) {
            LOG.warn("Failed to shutdown detect-dead-executor properly.");
        }
        if (!ExecutorUtils.shutdownAndAwaitTermination(this.updateHeartbeatExecutor, utilExecutorsWaitBeforeInterrupt, utilExecutorsWaitAfterInterrupt)) {
            LOG.warn("Failed to shutdown update-heartbeat-executor properly.");
        }
        LOG.info("Letting running executions finish. Will wait up to 2x{}.", (Object)this.shutdownMaxWait);
        Instant startShutdown = this.clock.now();
        if (ExecutorUtils.shutdownAndAwaitTermination(this.executorService, this.shutdownMaxWait, this.shutdownMaxWait)) {
            LOG.info("Scheduler stopped.");
        } else {
            LOG.warn("Scheduler stopped, but some tasks did not complete. Was currently running the following executions:\n{}", (Object)new ArrayList<Execution>(this.currentlyProcessing.keySet()).stream().map(Execution::toString).collect(Collectors.joining("\n")));
        }
        Duration shutdownTime = Duration.between(startShutdown, this.clock.now());
        if (this.shutdownMaxWait.toMillis() > Duration.ofMinutes(1L).toMillis() && shutdownTime.toMillis() >= this.shutdownMaxWait.toMillis()) {
            LOG.info("Shutdown of the scheduler executor service took {}. Consider regularly checking for 'executionContext.getSchedulerState().isShuttingDown()' in task execution-handler and abort when scheduler is shutting down.", (Object)shutdownTime);
        }
    }

    public SchedulerState getSchedulerState() {
        return this.schedulerState;
    }

    @Override
    public <T> void schedule(TaskInstance<T> taskInstance, Instant executionTime) {
        this.delegate.schedule(taskInstance, executionTime);
    }

    @Override
    public void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime) {
        this.delegate.reschedule(taskInstanceId, newExecutionTime);
    }

    @Override
    public <T> void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime, T newData) {
        this.delegate.reschedule(taskInstanceId, newExecutionTime, newData);
    }

    @Override
    public void cancel(TaskInstanceId taskInstanceId) {
        this.delegate.cancel(taskInstanceId);
    }

    @Override
    public void fetchScheduledExecutions(Consumer<ScheduledExecution<Object>> consumer) {
        this.delegate.fetchScheduledExecutions(consumer);
    }

    @Override
    public void fetchScheduledExecutions(ScheduledExecutionsFilter filter, Consumer<ScheduledExecution<Object>> consumer) {
        this.delegate.fetchScheduledExecutions(filter, consumer);
    }

    @Override
    public <T> void fetchScheduledExecutionsForTask(String taskName, Class<T> dataClass, Consumer<ScheduledExecution<T>> consumer) {
        this.delegate.fetchScheduledExecutionsForTask(taskName, dataClass, consumer);
    }

    @Override
    public <T> void fetchScheduledExecutionsForTask(String taskName, Class<T> dataClass, ScheduledExecutionsFilter filter, Consumer<ScheduledExecution<T>> consumer) {
        this.delegate.fetchScheduledExecutionsForTask(taskName, dataClass, filter, consumer);
    }

    @Override
    public Optional<ScheduledExecution<Object>> getScheduledExecution(TaskInstanceId taskInstanceId) {
        return this.delegate.getScheduledExecution(taskInstanceId);
    }

    public List<Execution> getFailingExecutions(Duration failingAtLeastFor) {
        return this.schedulerTaskRepository.getExecutionsFailingLongerThan(failingAtLeastFor);
    }

    public void triggerCheckForDueExecutions() {
        this.executeDueWaiter.wakeOrSkipNextWait();
    }

    public List<CurrentlyExecuting> getCurrentlyExecuting() {
        return new ArrayList<CurrentlyExecuting>(this.currentlyProcessing.values());
    }

    protected void executeDue() {
        Instant now = this.clock.now();
        List<Execution> dueExecutions = this.schedulerTaskRepository.getDue(now, this.pollingLimit);
        LOG.trace("Found {} task instances due for execution", (Object)dueExecutions.size());
        ++this.currentGenerationNumber;
        DueExecutionsBatch newDueBatch = new DueExecutionsBatch(this.threadpoolSize, this.currentGenerationNumber, dueExecutions.size(), this.pollingLimit == dueExecutions.size());
        for (Execution e : dueExecutions) {
            this.executorService.execute(new PickAndExecute(e, newDueBatch));
        }
        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE);
    }

    protected void detectDeadExecutions() {
        LOG.debug("Deleting executions with unresolved tasks.");
        this.taskResolver.getUnresolvedTaskNames(this.deleteUnresolvedAfter).forEach(taskName -> {
            LOG.warn("Deleting all executions for task with name '{}'. They have been unresolved for more than {}", taskName, (Object)this.deleteUnresolvedAfter);
            int removed = this.schedulerTaskRepository.removeExecutions((String)taskName);
            LOG.info("Removed {} executions", (Object)removed);
            this.taskResolver.clearUnresolved((String)taskName);
        });
        LOG.debug("Checking for dead executions.");
        Instant now = this.clock.now();
        Instant oldAgeLimit = now.minus(this.getMaxAgeBeforeConsideredDead());
        List<Execution> oldExecutions = this.schedulerTaskRepository.getDeadExecutions(oldAgeLimit);
        if (!oldExecutions.isEmpty()) {
            oldExecutions.forEach(execution -> {
                LOG.info("Found dead execution. Delegating handling to task. Execution: " + execution);
                try {
                    Optional<Task> task = this.taskResolver.resolve(execution.taskInstance.getTaskName());
                    if (task.isPresent()) {
                        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.DEAD_EXECUTION);
                        task.get().getDeadExecutionHandler().deadExecution((Execution)execution, new ExecutionOperations(this.schedulerTaskRepository, (Execution)execution));
                    } else {
                        LOG.error("Failed to find implementation for task with name '{}' for detected dead execution. Either delete the execution from the databaser, or add an implementation for it.", (Object)execution.taskInstance.getTaskName());
                    }
                }
                catch (Throwable e) {
                    LOG.error("Failed while handling dead execution {}. Will be tried again later.", execution, (Object)e);
                    this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
                }
            });
        } else {
            LOG.trace("No dead executions found.");
        }
        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_DETECT_DEAD);
    }

    void updateHeartbeats() {
        if (this.currentlyProcessing.isEmpty()) {
            LOG.trace("No executions to update heartbeats for. Skipping.");
            return;
        }
        LOG.debug("Updating heartbeats for {} executions being processed.", (Object)this.currentlyProcessing.size());
        Instant now = this.clock.now();
        new ArrayList<Execution>(this.currentlyProcessing.keySet()).forEach(execution -> {
            LOG.trace("Updating heartbeat for execution: " + execution);
            try {
                this.schedulerTaskRepository.updateHeartbeat((Execution)execution, now);
            }
            catch (Throwable e) {
                LOG.error("Failed while updating heartbeat for execution {}. Will try again later.", execution, (Object)e);
                this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
            }
        });
        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_UPDATE_HEARTBEATS);
    }

    private Duration getMaxAgeBeforeConsideredDead() {
        return this.heartbeatInterval.multipliedBy(4L);
    }

    public static SchedulerBuilder create(DataSource dataSource, Task<?> ... knownTasks) {
        return Scheduler.create(dataSource, Arrays.asList(knownTasks));
    }

    public static SchedulerBuilder create(DataSource dataSource, List<Task<?>> knownTasks) {
        return new SchedulerBuilder(dataSource, knownTasks);
    }

    private class PickAndExecute
    extends LoggingRunnable {
        private Execution candidate;
        private DueExecutionsBatch addedDueExecutionsBatch;

        public PickAndExecute(Execution candidate, DueExecutionsBatch dueExecutionsBatch) {
            this.candidate = candidate;
            this.addedDueExecutionsBatch = dueExecutionsBatch;
        }

        @Override
        public void runButLogExceptions() {
            if (Scheduler.this.schedulerState.isShuttingDown()) {
                LOG.info("Scheduler has been shutdown. Skipping fetched due execution: " + this.candidate.taskInstance.getTaskAndInstance());
                return;
            }
            try {
                if (this.addedDueExecutionsBatch.isOlderGenerationThan(Scheduler.this.currentGenerationNumber)) {
                    this.addedDueExecutionsBatch.markBatchAsStale();
                    Scheduler.this.statsRegistry.register(StatsRegistry.CandidateStatsEvent.STALE);
                    LOG.trace("Skipping queued execution (current generationNumber: {}, execution generationNumber: {})", (Object)Scheduler.this.currentGenerationNumber, (Object)this.addedDueExecutionsBatch.getGenerationNumber());
                    return;
                }
                Optional<Execution> pickedExecution = Scheduler.this.schedulerTaskRepository.pick(this.candidate, Scheduler.this.clock.now());
                if (!pickedExecution.isPresent()) {
                    LOG.debug("Execution picked by another scheduler. Continuing to next due execution.");
                    Scheduler.this.statsRegistry.register(StatsRegistry.CandidateStatsEvent.ALREADY_PICKED);
                    return;
                }
                Scheduler.this.currentlyProcessing.put(pickedExecution.get(), new CurrentlyExecuting(pickedExecution.get(), Scheduler.this.clock));
                try {
                    Scheduler.this.statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED);
                    this.executePickedExecution(pickedExecution.get());
                }
                finally {
                    if (Scheduler.this.currentlyProcessing.remove(pickedExecution.get()) == null) {
                        LOG.warn("Released execution was not found in collection of executions currently being processed. Should never happen.");
                    }
                }
            }
            finally {
                this.addedDueExecutionsBatch.oneExecutionDone(Scheduler.this::triggerCheckForDueExecutions);
            }
        }

        private void executePickedExecution(Execution execution) {
            Optional<Task> task = Scheduler.this.taskResolver.resolve(execution.taskInstance.getTaskName());
            if (!task.isPresent()) {
                LOG.error("Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.", (Object)execution.taskInstance.getTaskName());
                Scheduler.this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
                return;
            }
            Instant executionStarted = Scheduler.this.clock.now();
            try {
                LOG.debug("Executing " + execution);
                CompletionHandler completion = task.get().execute(execution.taskInstance, new ExecutionContext(Scheduler.this.schedulerState, execution, Scheduler.this));
                LOG.debug("Execution done");
                this.complete(completion, execution, executionStarted);
                Scheduler.this.statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED);
            }
            catch (RuntimeException unhandledException) {
                this.failure(task.get(), execution, unhandledException, executionStarted, "Unhandled exception");
                Scheduler.this.statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
            }
            catch (Throwable unhandledError) {
                this.failure(task.get(), execution, unhandledError, executionStarted, "Error");
                Scheduler.this.statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
            }
        }

        private void complete(CompletionHandler completion, Execution execution, Instant executionStarted) {
            ExecutionComplete completeEvent = ExecutionComplete.success(execution, executionStarted, Scheduler.this.clock.now());
            try {
                completion.complete(completeEvent, new ExecutionOperations(Scheduler.this.schedulerTaskRepository, execution));
                Scheduler.this.statsRegistry.registerSingleCompletedExecution(completeEvent);
            }
            catch (Throwable e) {
                Scheduler.this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.COMPLETIONHANDLER_ERROR);
                Scheduler.this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
                LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. The execution should be detected as dead in {}, and handled according to the tasks DeadExecutionHandler.", new Object[]{execution, Scheduler.this.getMaxAgeBeforeConsideredDead(), e});
            }
        }

        private void failure(Task task, Execution execution, Throwable cause, Instant executionStarted, String errorMessagePrefix) {
            String logMessage = errorMessagePrefix + " during execution of task with name '{}'. Treating as failure.";
            Scheduler.this.failureLogger.log(logMessage, cause, task.getName());
            ExecutionComplete completeEvent = ExecutionComplete.failure(execution, executionStarted, Scheduler.this.clock.now(), cause);
            try {
                task.getFailureHandler().onFailure(completeEvent, new ExecutionOperations(Scheduler.this.schedulerTaskRepository, execution));
                Scheduler.this.statsRegistry.registerSingleCompletedExecution(completeEvent);
            }
            catch (Throwable e) {
                Scheduler.this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILUREHANDLER_ERROR);
                Scheduler.this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
                LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. The execution should be detected as dead in {}, and handled according to the tasks DeadExecutionHandler.", new Object[]{execution, Scheduler.this.getMaxAgeBeforeConsideredDead(), e});
            }
        }
    }
}

