/*
 * Decompiled with CFR 0.152.
 */
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.Clock;
import com.github.kagkarlsson.scheduler.DueExecutionsBatch;
import com.github.kagkarlsson.scheduler.ExecutePicked;
import com.github.kagkarlsson.scheduler.Executor;
import com.github.kagkarlsson.scheduler.PollStrategy;
import com.github.kagkarlsson.scheduler.PollingStrategyConfig;
import com.github.kagkarlsson.scheduler.SchedulerClient;
import com.github.kagkarlsson.scheduler.SchedulerState;
import com.github.kagkarlsson.scheduler.TaskRepository;
import com.github.kagkarlsson.scheduler.TaskResolver;
import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.Execution;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FetchCandidates
implements PollStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(FetchCandidates.class);
    private final Executor executor;
    private final TaskRepository taskRepository;
    private final SchedulerClient schedulerClient;
    private final StatsRegistry statsRegistry;
    private final SchedulerState schedulerState;
    private final ConfigurableLogger failureLogger;
    private final TaskResolver taskResolver;
    private final Clock clock;
    private final PollingStrategyConfig pollingStrategyConfig;
    private final Runnable triggerCheckForNewExecutions;
    AtomicInteger currentGenerationNumber = new AtomicInteger(0);
    private final int lowerLimit;
    private final int upperLimit;

    public FetchCandidates(Executor executor, TaskRepository taskRepository, SchedulerClient schedulerClient, int threadpoolSize, StatsRegistry statsRegistry, SchedulerState schedulerState, ConfigurableLogger failureLogger, TaskResolver taskResolver, Clock clock, PollingStrategyConfig pollingStrategyConfig, Runnable triggerCheckForNewExecutions) {
        this.executor = executor;
        this.taskRepository = taskRepository;
        this.schedulerClient = schedulerClient;
        this.statsRegistry = statsRegistry;
        this.schedulerState = schedulerState;
        this.failureLogger = failureLogger;
        this.taskResolver = taskResolver;
        this.clock = clock;
        this.pollingStrategyConfig = pollingStrategyConfig;
        this.triggerCheckForNewExecutions = triggerCheckForNewExecutions;
        this.lowerLimit = pollingStrategyConfig.getLowerLimit(threadpoolSize);
        this.upperLimit = pollingStrategyConfig.getUpperLimit(threadpoolSize);
    }

    @Override
    public void run() {
        Instant now = this.clock.now();
        int executionsToFetch = this.upperLimit;
        List<Execution> fetchedDueExecutions = this.taskRepository.getDue(now, executionsToFetch);
        LOG.trace("Fetched {} task instances due for execution", (Object)fetchedDueExecutions.size());
        this.currentGenerationNumber.incrementAndGet();
        DueExecutionsBatch newDueBatch = new DueExecutionsBatch(this.currentGenerationNumber.get(), fetchedDueExecutions.size(), executionsToFetch == fetchedDueExecutions.size(), leftInBatch -> leftInBatch <= this.lowerLimit);
        for (Execution e : fetchedDueExecutions) {
            this.executor.addToQueue(() -> {
                Object candidate = new PickDue(e, newDueBatch).call();
                ((Optional)candidate).ifPresent(picked -> new ExecutePicked(this.executor, this.taskRepository, this.schedulerClient, this.statsRegistry, this.taskResolver, this.schedulerState, this.failureLogger, this.clock, (Execution)picked).run());
            }, () -> newDueBatch.oneExecutionDone(this.triggerCheckForNewExecutions::run));
        }
        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE);
    }

    private class PickDue
    implements Callable<Optional<Execution>> {
        private final Execution candidate;
        private final DueExecutionsBatch addedDueExecutionsBatch;

        public PickDue(Execution candidate, DueExecutionsBatch dueExecutionsBatch) {
            this.candidate = candidate;
            this.addedDueExecutionsBatch = dueExecutionsBatch;
        }

        @Override
        public Optional<Execution> call() {
            if (FetchCandidates.this.schedulerState.isShuttingDown()) {
                LOG.info("Scheduler has been shutdown. Skipping fetched due execution: " + this.candidate.taskInstance.getTaskAndInstance());
                return Optional.empty();
            }
            if (this.addedDueExecutionsBatch.isOlderGenerationThan(FetchCandidates.this.currentGenerationNumber.get())) {
                this.addedDueExecutionsBatch.markBatchAsStale();
                FetchCandidates.this.statsRegistry.register(StatsRegistry.CandidateStatsEvent.STALE);
                LOG.trace("Skipping queued execution (current generationNumber: {}, execution generationNumber: {})", (Object)FetchCandidates.this.currentGenerationNumber, (Object)this.addedDueExecutionsBatch.getGenerationNumber());
                return Optional.empty();
            }
            Optional<Execution> pickedExecution = FetchCandidates.this.taskRepository.pick(this.candidate, FetchCandidates.this.clock.now());
            if (!pickedExecution.isPresent()) {
                LOG.debug("Execution picked by another scheduler. Continuing to next due execution.");
                FetchCandidates.this.statsRegistry.register(StatsRegistry.CandidateStatsEvent.ALREADY_PICKED);
                return Optional.empty();
            }
            return pickedExecution;
        }
    }
}

