/*
 * Decompiled with CFR 0.152.
 */
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.Clock;
import com.github.kagkarlsson.scheduler.CurrentlyExecuting;
import com.github.kagkarlsson.scheduler.Executor;
import com.github.kagkarlsson.scheduler.ExecutorUtils;
import com.github.kagkarlsson.scheduler.FetchCandidates;
import com.github.kagkarlsson.scheduler.HeartbeatConfig;
import com.github.kagkarlsson.scheduler.HeartbeatState;
import com.github.kagkarlsson.scheduler.LockAndFetchCandidates;
import com.github.kagkarlsson.scheduler.PollStrategy;
import com.github.kagkarlsson.scheduler.PollingStrategyConfig;
import com.github.kagkarlsson.scheduler.RunAndLogErrors;
import com.github.kagkarlsson.scheduler.RunUntilShutdown;
import com.github.kagkarlsson.scheduler.ScheduledExecution;
import com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter;
import com.github.kagkarlsson.scheduler.SchedulerBuilder;
import com.github.kagkarlsson.scheduler.SchedulerClient;
import com.github.kagkarlsson.scheduler.SchedulerName;
import com.github.kagkarlsson.scheduler.SchedulerState;
import com.github.kagkarlsson.scheduler.TaskRepository;
import com.github.kagkarlsson.scheduler.TaskResolver;
import com.github.kagkarlsson.scheduler.Waiter;
import com.github.kagkarlsson.scheduler.event.ExecutionInterceptor;
import com.github.kagkarlsson.scheduler.event.SchedulerListener;
import com.github.kagkarlsson.scheduler.event.SchedulerListeners;
import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.logging.LogLevel;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.ExecutionComplete;
import com.github.kagkarlsson.scheduler.task.ExecutionOperations;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Scheduler
implements SchedulerClient {
    public static final double TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO = 0.5;
    public static final String THREAD_PREFIX = "db-scheduler";
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private final SchedulerClient delegate;
    final Clock clock;
    final TaskRepository schedulerTaskRepository;
    final TaskResolver taskResolver;
    protected final PollStrategy executeDueStrategy;
    protected final Executor executor;
    private final ScheduledExecutorService housekeeperExecutor;
    private final HeartbeatConfig heartbeatConfig;
    private final int numberOfMissedHeartbeatsBeforeDead;
    int threadpoolSize;
    private final Waiter executeDueWaiter;
    private final Duration deleteUnresolvedAfter;
    private final Duration shutdownMaxWait;
    protected final List<OnStartup> onStartup;
    private final Waiter detectDeadWaiter;
    private final Duration heartbeatInterval;
    final SchedulerListeners schedulerListeners;
    private final ExecutorService dueExecutor;
    private final Waiter heartbeatWaiter;
    final SchedulerState.SettableSchedulerState schedulerState = new SchedulerState.SettableSchedulerState();
    final ConfigurableLogger failureLogger;

    protected Scheduler(Clock clock, TaskRepository schedulerTaskRepository, TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize, ExecutorService executorService, SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval, int numberOfMissedHeartbeatsBeforeDead, List<SchedulerListener> schedulerListeners, List<ExecutionInterceptor> executionInterceptors, PollingStrategyConfig pollingStrategyConfig, Duration deleteUnresolvedAfter, Duration shutdownMaxWait, LogLevel logLevel, boolean logStackTrace, List<OnStartup> onStartup, ExecutorService dueExecutor, ScheduledExecutorService housekeeperExecutor) {
        this.clock = clock;
        this.schedulerTaskRepository = schedulerTaskRepository;
        this.taskResolver = taskResolver;
        this.threadpoolSize = threadpoolSize;
        this.executor = new Executor(executorService, clock);
        this.executeDueWaiter = executeDueWaiter;
        this.deleteUnresolvedAfter = deleteUnresolvedAfter;
        this.shutdownMaxWait = shutdownMaxWait;
        this.onStartup = onStartup;
        this.detectDeadWaiter = new Waiter(heartbeatInterval.multipliedBy(2L), clock);
        this.heartbeatInterval = heartbeatInterval;
        this.numberOfMissedHeartbeatsBeforeDead = numberOfMissedHeartbeatsBeforeDead;
        this.heartbeatWaiter = new Waiter(heartbeatInterval, clock);
        this.heartbeatConfig = new HeartbeatConfig(heartbeatInterval, numberOfMissedHeartbeatsBeforeDead, this.getMaxAgeBeforeConsideredDead());
        this.schedulerListeners = new SchedulerListeners(schedulerListeners);
        this.dueExecutor = dueExecutor;
        this.housekeeperExecutor = housekeeperExecutor;
        this.delegate = new SchedulerClient.StandardSchedulerClient(clientTaskRepository, this.schedulerListeners, clock);
        this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace);
        if (pollingStrategyConfig.type == PollingStrategyConfig.Type.LOCK_AND_FETCH) {
            schedulerTaskRepository.verifySupportsLockAndFetch();
            this.executeDueStrategy = new LockAndFetchCandidates(this.executor, schedulerTaskRepository, this, threadpoolSize, this.schedulerListeners, executionInterceptors, this.schedulerState, this.failureLogger, taskResolver, clock, pollingStrategyConfig, this::triggerCheckForDueExecutions, this.heartbeatConfig);
        } else if (pollingStrategyConfig.type == PollingStrategyConfig.Type.FETCH) {
            this.executeDueStrategy = new FetchCandidates(this.executor, schedulerTaskRepository, this, threadpoolSize, this.schedulerListeners, executionInterceptors, this.schedulerState, this.failureLogger, taskResolver, clock, pollingStrategyConfig, this::triggerCheckForDueExecutions, this.heartbeatConfig);
        } else {
            throw new IllegalArgumentException("Unknown polling-strategy type: " + pollingStrategyConfig.type);
        }
        LOG.info("Using polling-strategy: " + pollingStrategyConfig.describe());
    }

    public void registerSchedulerListener(SchedulerListener listener) {
        this.schedulerListeners.add(listener);
    }

    public void start() {
        LOG.info("Starting scheduler.");
        this.executeOnStartup();
        this.dueExecutor.submit(new RunUntilShutdown(this.executeDueStrategy, this.executeDueWaiter, this.schedulerState, this.schedulerListeners));
        this.housekeeperExecutor.scheduleWithFixedDelay(new RunAndLogErrors(this::detectDeadExecutions, this.schedulerListeners), 0L, this.detectDeadWaiter.getWaitDuration().toMillis(), TimeUnit.MILLISECONDS);
        this.housekeeperExecutor.scheduleWithFixedDelay(new RunAndLogErrors(this::updateHeartbeats, this.schedulerListeners), 0L, this.heartbeatWaiter.getWaitDuration().toMillis(), TimeUnit.MILLISECONDS);
        this.schedulerState.setStarted();
    }

    protected void executeDue() {
        this.executeDueStrategy.run();
    }

    protected void executeOnStartup() {
        SchedulerClient.StandardSchedulerClient onStartupClient = new SchedulerClient.StandardSchedulerClient(this.schedulerTaskRepository, this.clock);
        this.onStartup.forEach(os -> {
            try {
                os.onStartup(onStartupClient, this.clock);
            }
            catch (Exception e) {
                LOG.error("Unexpected error while executing OnStartup tasks. Continuing.", (Throwable)e);
                this.schedulerListeners.onSchedulerEvent(SchedulerListener.SchedulerEventType.UNEXPECTED_ERROR);
            }
        });
    }

    public void stop() {
        this.stop(Duration.ofSeconds(1L), Duration.ofSeconds(5L));
    }

    void stop(Duration utilExecutorsWaitBeforeInterrupt, Duration utilExecutorsWaitAfterInterrupt) {
        if (this.schedulerState.isShuttingDown()) {
            LOG.warn("Multiple calls to 'stop()'. Scheduler is already stopping.");
            return;
        }
        this.schedulerState.setIsShuttingDown();
        LOG.info("Shutting down Scheduler.");
        if (this.executeDueWaiter.isWaiting()) {
            this.dueExecutor.shutdownNow();
            if (!ExecutorUtils.awaitTermination(this.dueExecutor, utilExecutorsWaitAfterInterrupt)) {
                LOG.warn("Failed to shutdown due-executor properly.");
            }
        } else if (!ExecutorUtils.shutdownAndAwaitTermination(this.dueExecutor, utilExecutorsWaitBeforeInterrupt, utilExecutorsWaitAfterInterrupt)) {
            LOG.warn("Failed to shutdown due-executor properly.");
        }
        this.executor.stop(this.shutdownMaxWait);
        if (!ExecutorUtils.shutdownAndAwaitTermination(this.housekeeperExecutor, utilExecutorsWaitBeforeInterrupt, utilExecutorsWaitAfterInterrupt)) {
            LOG.warn("Failed to shutdown housekeeper-executor properly.");
        }
    }

    public void pause() {
        this.schedulerState.setPaused(true);
    }

    public void resume() {
        this.schedulerState.setPaused(false);
    }

    public SchedulerState getSchedulerState() {
        return this.schedulerState;
    }

    @Override
    public <T> void schedule(SchedulableInstance<T> schedulableInstance) {
        this.delegate.schedule(schedulableInstance);
    }

    @Override
    public <T> boolean scheduleIfNotExists(TaskInstance<T> taskInstance, Instant executionTime) {
        return this.delegate.scheduleIfNotExists(taskInstance, executionTime);
    }

    @Override
    public <T> boolean scheduleIfNotExists(SchedulableInstance<T> schedulableInstance) {
        return this.delegate.scheduleIfNotExists(schedulableInstance);
    }

    @Override
    public void scheduleBatch(List<TaskInstance<?>> taskInstances, Instant executionTime) {
        this.delegate.scheduleBatch(taskInstances, executionTime);
    }

    @Override
    public void scheduleBatch(List<SchedulableInstance<?>> schedulableInstances) {
        this.delegate.scheduleBatch(schedulableInstances);
    }

    @Override
    public <T> void schedule(TaskInstance<T> taskInstance, Instant executionTime) {
        this.delegate.schedule(taskInstance, executionTime);
    }

    @Override
    public boolean reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime) {
        return this.delegate.reschedule(taskInstanceId, newExecutionTime);
    }

    @Override
    public <T> boolean reschedule(SchedulableInstance<T> schedulableInstance) {
        return this.delegate.reschedule(schedulableInstance);
    }

    @Override
    public <T> boolean reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime, T newData) {
        return this.delegate.reschedule(taskInstanceId, newExecutionTime, newData);
    }

    @Override
    public <T> boolean schedule(TaskInstance<T> taskInstance, Instant executionTime, SchedulerClient.ScheduleOptions scheduleOptions) {
        return this.delegate.schedule(taskInstance, executionTime, scheduleOptions);
    }

    @Override
    public <T> boolean schedule(SchedulableInstance<T> schedulableInstance, SchedulerClient.ScheduleOptions scheduleOptions) {
        return this.delegate.schedule(schedulableInstance, scheduleOptions);
    }

    @Override
    public void cancel(TaskInstanceId taskInstanceId) {
        this.delegate.cancel(taskInstanceId);
    }

    @Override
    public void fetchScheduledExecutions(Consumer<ScheduledExecution<Object>> consumer) {
        this.delegate.fetchScheduledExecutions(consumer);
    }

    @Override
    public void fetchScheduledExecutions(ScheduledExecutionsFilter filter, Consumer<ScheduledExecution<Object>> consumer) {
        this.delegate.fetchScheduledExecutions(filter, consumer);
    }

    @Override
    public <T> void fetchScheduledExecutionsForTask(String taskName, Class<T> dataClass, Consumer<ScheduledExecution<T>> consumer) {
        this.delegate.fetchScheduledExecutionsForTask(taskName, dataClass, consumer);
    }

    @Override
    public <T> void fetchScheduledExecutionsForTask(String taskName, Class<T> dataClass, ScheduledExecutionsFilter filter, Consumer<ScheduledExecution<T>> consumer) {
        this.delegate.fetchScheduledExecutionsForTask(taskName, dataClass, filter, consumer);
    }

    @Override
    public Optional<ScheduledExecution<Object>> getScheduledExecution(TaskInstanceId taskInstanceId) {
        return this.delegate.getScheduledExecution(taskInstanceId);
    }

    public List<Execution> getFailingExecutions(Duration failingAtLeastFor) {
        return this.schedulerTaskRepository.getExecutionsFailingLongerThan(failingAtLeastFor);
    }

    public void triggerCheckForDueExecutions() {
        this.executeDueWaiter.wakeOrSkipNextWait();
    }

    public List<CurrentlyExecuting> getCurrentlyExecuting() {
        return this.executor.getCurrentlyExecuting();
    }

    public List<CurrentlyExecuting> getCurrentlyExecutingWithStaleHeartbeat() {
        return this.executor.getCurrentlyExecuting().stream().filter(c -> c.getHeartbeatState().hasStaleHeartbeat()).collect(Collectors.toList());
    }

    protected void detectDeadExecutions() {
        LOG.debug("Deleting executions with unresolved tasks.");
        this.taskResolver.getUnresolvedTaskNames(this.deleteUnresolvedAfter).forEach(taskName -> {
            LOG.warn("Deleting all executions for task with name '{}'. They have been unresolved for more than {}", taskName, (Object)this.deleteUnresolvedAfter);
            int removed = this.schedulerTaskRepository.removeExecutions((String)taskName);
            LOG.info("Removed {} executions", (Object)removed);
            this.taskResolver.clearUnresolved((String)taskName);
        });
        LOG.debug("Checking for dead executions.");
        Instant now = this.clock.now();
        Instant oldAgeLimit = now.minus(this.getMaxAgeBeforeConsideredDead());
        List<Execution> oldExecutions = this.schedulerTaskRepository.getDeadExecutions(oldAgeLimit);
        if (!oldExecutions.isEmpty()) {
            oldExecutions.forEach(execution -> {
                LOG.info("Found dead execution. Delegating handling to task. Execution: " + execution);
                try {
                    Optional<Task> task = this.taskResolver.resolve(execution.taskInstance.getTaskName());
                    if (task.isPresent()) {
                        this.schedulerListeners.onSchedulerEvent(SchedulerListener.SchedulerEventType.DEAD_EXECUTION);
                        this.schedulerListeners.onExecutionDead((Execution)execution);
                        task.get().getDeadExecutionHandler().deadExecution(ExecutionComplete.failure(execution, now, now, null), new ExecutionOperations(this.schedulerTaskRepository, this.schedulerListeners, (Execution)execution));
                    } else {
                        LOG.error("Failed to find implementation for task with name '{}' for detected dead execution. Either delete the execution from the databaser, or add an implementation for it.", (Object)execution.taskInstance.getTaskName());
                    }
                }
                catch (Throwable e) {
                    LOG.error("Failed while handling dead execution {}. Will be tried again later.", execution, (Object)e);
                    this.schedulerListeners.onSchedulerEvent(SchedulerListener.SchedulerEventType.UNEXPECTED_ERROR);
                }
            });
        } else {
            LOG.trace("No dead executions found.");
        }
        this.schedulerListeners.onSchedulerEvent(SchedulerListener.SchedulerEventType.RAN_DETECT_DEAD);
    }

    void updateHeartbeats() {
        List<CurrentlyExecuting> currentlyProcessing = this.executor.getCurrentlyExecuting();
        if (currentlyProcessing.isEmpty()) {
            LOG.trace("No executions to update heartbeats for. Skipping.");
            return;
        }
        LOG.debug("Updating heartbeats for {} executions being processed.", (Object)currentlyProcessing.size());
        Instant now = this.clock.now();
        currentlyProcessing.forEach(execution -> this.updateHeartbeatForExecution(now, (CurrentlyExecuting)execution));
        this.schedulerListeners.onSchedulerEvent(SchedulerListener.SchedulerEventType.RAN_UPDATE_HEARTBEATS);
    }

    protected void updateHeartbeatForExecution(Instant now, CurrentlyExecuting currentlyExecuting) {
        Execution e = currentlyExecuting.getExecution();
        LOG.trace("Updating heartbeat for execution: " + e);
        try {
            HeartbeatState heartbeatState;
            boolean successfulHeartbeat = this.schedulerTaskRepository.updateHeartbeatWithRetry(e, now, 3);
            currentlyExecuting.heartbeat(successfulHeartbeat, now);
            if (!successfulHeartbeat) {
                this.schedulerListeners.onSchedulerEvent(SchedulerListener.SchedulerEventType.FAILED_HEARTBEAT);
                this.schedulerListeners.onExecutionFailedHeartbeat(currentlyExecuting);
            }
            if ((heartbeatState = currentlyExecuting.getHeartbeatState()).getFailedHeartbeats() > 1) {
                LOG.warn("Execution has more than 1 failed heartbeats. Should not happen. Risk of being considered dead. See heartbeat-state. Heartbeat-state={}, Execution={}", (Object)heartbeatState.describe(), (Object)e);
                this.schedulerListeners.onSchedulerEvent(SchedulerListener.SchedulerEventType.FAILED_MULTIPLE_HEARTBEATS);
            }
        }
        catch (Throwable ex) {
            LOG.error("Unexpteced failure while while updating heartbeat for execution {}.", (Object)e, (Object)ex);
            this.schedulerListeners.onSchedulerEvent(SchedulerListener.SchedulerEventType.FAILED_HEARTBEAT);
            this.schedulerListeners.onSchedulerEvent(SchedulerListener.SchedulerEventType.UNEXPECTED_ERROR);
            this.schedulerListeners.onExecutionFailedHeartbeat(currentlyExecuting);
        }
    }

    Duration getMaxAgeBeforeConsideredDead() {
        return this.heartbeatInterval.multipliedBy(this.numberOfMissedHeartbeatsBeforeDead);
    }

    public static SchedulerBuilder create(DataSource dataSource, Task<?> ... knownTasks) {
        return Scheduler.create(dataSource, Arrays.asList(knownTasks));
    }

    public static SchedulerBuilder create(DataSource dataSource, List<Task<?>> knownTasks) {
        return new SchedulerBuilder(dataSource, knownTasks);
    }
}

