/*
 * Decompiled with CFR 0.152.
 */
package io.digdag.core.schedule;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigFactory;
import io.digdag.core.BackgroundExecutor;
import io.digdag.core.ErrorReporter;
import io.digdag.core.database.TransactionManager;
import io.digdag.core.log.LogMarkers;
import io.digdag.core.repository.ProjectStoreManager;
import io.digdag.core.repository.ResourceConflictException;
import io.digdag.core.repository.ResourceLimitExceededException;
import io.digdag.core.repository.ResourceNotFoundException;
import io.digdag.core.repository.StoredWorkflowDefinitionWithProject;
import io.digdag.core.schedule.ImmutableStoredSchedule;
import io.digdag.core.schedule.ScheduleConfig;
import io.digdag.core.schedule.ScheduleControl;
import io.digdag.core.schedule.ScheduleStoreManager;
import io.digdag.core.schedule.SchedulerManager;
import io.digdag.core.schedule.StoredSchedule;
import io.digdag.core.session.AttemptStateFlags;
import io.digdag.core.session.DelayedAttemptControlStore;
import io.digdag.core.session.ImmutableStoredSessionAttempt;
import io.digdag.core.session.ResumingTask;
import io.digdag.core.session.Session;
import io.digdag.core.session.SessionMonitor;
import io.digdag.core.session.SessionStore;
import io.digdag.core.session.SessionStoreManager;
import io.digdag.core.session.StoredDelayedSessionAttempt;
import io.digdag.core.session.StoredSessionAttemptWithSession;
import io.digdag.core.workflow.AttemptBuilder;
import io.digdag.core.workflow.AttemptRequest;
import io.digdag.core.workflow.SessionAttemptConflictException;
import io.digdag.core.workflow.WorkflowExecutor;
import io.digdag.spi.ScheduleTime;
import io.digdag.spi.Scheduler;
import io.digdag.spi.metrics.DigdagMetrics;
import io.digdag.util.DurationParam;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScheduleExecutor
implements BackgroundExecutor {
    private static final Logger logger = LoggerFactory.getLogger(ScheduleExecutor.class);
    public static final List<String> BUILT_IN_SCHEDULE_PARAMS = Arrays.asList("skip_on_overtime", "skip_delayed_by", "start", "end");
    private final ProjectStoreManager rm;
    private final ScheduleStoreManager sm;
    private final SchedulerManager srm;
    private final TransactionManager tm;
    private final SessionStoreManager sessionStoreManager;
    private final AttemptBuilder attemptBuilder;
    private final WorkflowExecutor workflowExecutor;
    private final ConfigFactory cf;
    private final ScheduleConfig scheduleConfig;
    private ScheduledExecutorService executor;
    @Inject(optional=true)
    private ErrorReporter errorReporter = ErrorReporter.empty();
    @Inject
    private DigdagMetrics metrics;

    @Inject
    public ScheduleExecutor(ProjectStoreManager rm, ScheduleStoreManager sm, SchedulerManager srm, TransactionManager tm, SessionStoreManager sessionStoreManager, AttemptBuilder attemptBuilder, WorkflowExecutor workflowExecutor, ConfigFactory cf, ScheduleConfig scheduleConfig) {
        this.rm = rm;
        this.sm = sm;
        this.srm = srm;
        this.tm = tm;
        this.sessionStoreManager = sessionStoreManager;
        this.attemptBuilder = attemptBuilder;
        this.workflowExecutor = workflowExecutor;
        this.cf = cf;
        this.scheduleConfig = scheduleConfig;
    }

    @VisibleForTesting
    boolean isStarted() {
        return this.executor != null;
    }

    @PostConstruct
    public synchronized void start() {
        if (this.scheduleConfig.getEnabled()) {
            if (this.executor == null) {
                this.executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("scheduler-%d").build());
            }
            this.executor.scheduleWithFixedDelay(() -> this.runSchedules(), 1L, 1L, TimeUnit.SECONDS);
            this.executor.scheduleWithFixedDelay(() -> this.runDelayedAttempts(), 1L, 1L, TimeUnit.SECONDS);
        } else {
            logger.debug("Scheduler is disabled.");
        }
    }

    @PreDestroy
    public synchronized void shutdown() {
        if (this.executor != null) {
            this.executor.shutdown();
            this.executor = null;
        }
    }

    @Override
    public void eagerShutdown() {
        this.shutdown();
    }

    private void runSchedules() {
        this.runSchedules(Instant.now());
    }

    private void runSchedules(Instant now) {
        try {
            while (this.runScheduleOnce(now)) {
            }
        }
        catch (Throwable t) {
            logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "An uncaught exception is ignored. Scheduling will be retried.", t);
            this.errorReporter.reportUncaughtError(t);
            this.metrics.increment(DigdagMetrics.Category.DEFAULT, "uncaughtErrors");
        }
    }

    @VisibleForTesting
    boolean runScheduleOnce(Instant now) {
        int count = this.tm.begin(() -> this.sm.lockReadySchedules(now, 1, (store, storedSchedule) -> this.runSchedule(new ScheduleControl(store, storedSchedule), now)));
        return count > 0;
    }

    private void runDelayedAttempts() {
        this.runDelayedAttempts(Instant.now());
    }

    @VisibleForTesting
    void runDelayedAttempts(Instant now) {
        try {
            this.tm.begin(() -> {
                this.sessionStoreManager.lockReadyDelayedAttempts(now, (delayedAttemptControlStore, delayedAttempt) -> this.runDelayedAttempt(delayedAttemptControlStore, delayedAttempt));
                return null;
            });
        }
        catch (Throwable t) {
            logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "An uncaught exception is ignored. Submitting delayed attempts will be retried.", t);
            this.errorReporter.reportUncaughtError(t);
            this.metrics.increment(DigdagMetrics.Category.DEFAULT, "uncaughtErrors");
        }
    }

    private void runSchedule(ScheduleControl lockedSched, Instant now) {
        ScheduleTime nextSchedule;
        StoredSchedule sched = lockedSched.get();
        Instant successfulSessionTime = null;
        try {
            StoredWorkflowDefinitionWithProject def = this.rm.getWorkflowDetailsById(sched.getWorkflowDefinitionId());
            SessionStore ss = this.sessionStoreManager.getSessionStore(def.getProject().getSiteId());
            List<StoredSessionAttemptWithSession> activeAttempts = ss.getActiveAttemptsOfWorkflow(def.getProject().getId(), def.getName(), 1, (Optional<Long>)Optional.absent());
            Scheduler sr = this.srm.getScheduler(def);
            Config scheduleConfig = SchedulerManager.getScheduleConfig(def);
            boolean skipOnOvertime = (Boolean)scheduleConfig.get("skip_on_overtime", Boolean.TYPE, (Object)false);
            Optional skipDelay = scheduleConfig.getOptional("skip_delayed_by", DurationParam.class);
            if (skipDelay.isPresent() && now.isAfter(sched.getNextRunTime().plusSeconds(((DurationParam)skipDelay.get()).getDuration().getSeconds()))) {
                logger.info("Now={} is too late from scheduled time={}. It's over skip_delayed_by={}. Skipping this schedule: {}", new Object[]{now, sched.getNextScheduleTime(), skipDelay.get(), sched});
                nextSchedule = sr.nextScheduleTime(sched.getNextScheduleTime());
            } else if (!activeAttempts.isEmpty() && skipOnOvertime) {
                logger.info("An attempt of the scheduled workflow is still running and skip_on_overtime = true. Skipping this schedule: {}", (Object)sched);
                nextSchedule = sr.nextScheduleTime(sched.getNextScheduleTime());
            } else {
                try {
                    nextSchedule = this.startSchedule(sched, sr, def);
                    successfulSessionTime = sched.getNextScheduleTime();
                }
                catch (ResourceLimitExceededException ex) {
                    logger.info("Number of attempts or tasks exceed limit. Pending this schedule for 10 minutes: {}", (Object)sched, (Object)ex);
                    nextSchedule = ScheduleTime.of((Instant)sched.getNextScheduleTime(), (Instant)ScheduleTime.alignedNow().plusSeconds(600L));
                }
                catch (ResourceConflictException ex) {
                    IllegalStateException error = new IllegalStateException("Detected duplicated excution of a scheduled workflow for the same scheduling time.", ex);
                    logger.error("Database state error during scheduling. Skipping this schedule: {}", (Object)sched, (Object)error);
                    nextSchedule = sr.nextScheduleTime(sched.getNextScheduleTime());
                }
            }
        }
        catch (ResourceNotFoundException ex) {
            logger.error("Database state error during scheduling. Pending this schedule for 1 hour: {}", (Object)sched, (Object)ex);
            nextSchedule = ScheduleTime.of((Instant)sched.getNextScheduleTime(), (Instant)sched.getNextRunTime().plusSeconds(3600L));
        }
        catch (RuntimeException ex) {
            logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Error during scheduling. Pending this schedule for 1 hour: {}", (Object)sched, (Object)ex);
            nextSchedule = ScheduleTime.of((Instant)sched.getNextScheduleTime(), (Instant)ScheduleTime.alignedNow().plusSeconds(3600L));
        }
        try {
            logger.info("Updating next schedule time: sched={}, next={}, lastSessionTime={}", new Object[]{sched, nextSchedule, successfulSessionTime});
            if (successfulSessionTime != null) {
                lockedSched.updateNextScheduleTimeAndLastSessionTime(nextSchedule, successfulSessionTime);
            } else {
                lockedSched.updateNextScheduleTime(nextSchedule);
            }
        }
        catch (ResourceNotFoundException ex) {
            throw new IllegalStateException("Workflow for a schedule id=" + sched.getId() + " is scheduled but does not exist.", ex);
        }
    }

    @VisibleForTesting
    ScheduleTime startSchedule(StoredSchedule sched, Scheduler sr, StoredWorkflowDefinitionWithProject def) throws ResourceNotFoundException, ResourceConflictException, ResourceLimitExceededException {
        Instant scheduleTime = sched.getNextScheduleTime();
        Instant runTime = sched.getNextRunTime();
        AttemptRequest ar = this.newAttemptRequest(def, ScheduleTime.of((Instant)scheduleTime, (Instant)runTime), (Optional<String>)Optional.absent(), sched.getLastSessionTime());
        try {
            this.workflowExecutor.submitWorkflow(def.getProject().getSiteId(), ar, def);
        }
        catch (SessionAttemptConflictException ex) {
            logger.debug("Scheduled attempt {} is already executed. Skipping", (Object)ex.getConflictedSession());
        }
        return sr.nextScheduleTime(scheduleTime);
    }

    public StoredSchedule skipScheduleToTime(int siteId, int schedId, Instant nextTime, Optional<Instant> runTime, boolean dryRun) throws ResourceNotFoundException, ResourceConflictException {
        return this.sm.getScheduleStore(siteId).updateScheduleById(schedId, (store, sched) -> {
            ScheduleControl lockedSched = new ScheduleControl(store, sched);
            Scheduler sr = this.getSchedulerOfSchedule(sched);
            ScheduleTime alignedNextTime = sr.getFirstScheduleTime(nextTime);
            if (sched.getNextScheduleTime().isBefore(alignedNextTime.getTime())) {
                if (runTime.isPresent()) {
                    alignedNextTime = ScheduleTime.of((Instant)alignedNextTime.getTime(), (Instant)((Instant)runTime.get()));
                }
                StoredSchedule updatedSched = ScheduleExecutor.copyWithUpdatedScheduleTime(sched, alignedNextTime);
                if (!dryRun) {
                    lockedSched.updateNextScheduleTime(alignedNextTime);
                }
                return updatedSched;
            }
            throw new ResourceConflictException("Specified time to skip schedules is already past");
        });
    }

    public StoredSchedule skipScheduleByCount(int siteId, int schedId, Instant currentTime, int count, Optional<Instant> runTime, boolean dryRun) throws ResourceNotFoundException, ResourceConflictException {
        return this.sm.getScheduleStore(siteId).updateScheduleById(schedId, (store, sched) -> {
            ScheduleControl lockedSched = new ScheduleControl(store, sched);
            Scheduler sr = this.getSchedulerOfSchedule(sched);
            ScheduleTime time = sr.getFirstScheduleTime(currentTime);
            for (int i = 0; i < count; ++i) {
                time = sr.nextScheduleTime(time.getTime());
            }
            if (sched.getNextScheduleTime().isBefore(time.getTime())) {
                if (runTime.isPresent()) {
                    time = ScheduleTime.of((Instant)time.getTime(), (Instant)((Instant)runTime.get()));
                }
                StoredSchedule updatedSched = ScheduleExecutor.copyWithUpdatedScheduleTime(sched, time);
                if (!dryRun) {
                    lockedSched.updateNextScheduleTime(time);
                }
                return updatedSched;
            }
            throw new ResourceConflictException("Specified time to skip schedules is already past");
        });
    }

    private static StoredSchedule copyWithUpdatedScheduleTime(StoredSchedule sched, ScheduleTime nextTime) {
        return ImmutableStoredSchedule.builder().from(sched).nextRunTime(nextTime.getRunTime()).nextScheduleTime(nextTime.getTime()).build();
    }

    private Scheduler getSchedulerOfSchedule(StoredSchedule sched) throws ResourceNotFoundException {
        StoredWorkflowDefinitionWithProject def = this.rm.getWorkflowDetailsById(sched.getWorkflowDefinitionId());
        return this.srm.getScheduler(def);
    }

    public List<StoredSessionAttemptWithSession> backfill(int siteId, int schedId, Instant fromTime, String attemptName, Optional<Integer> count, boolean dryRun) throws ResourceNotFoundException, ResourceConflictException, ResourceLimitExceededException {
        SessionStore ss = this.sessionStoreManager.getSessionStore(siteId);
        return (List)this.sm.getScheduleStore(siteId).lockScheduleById(schedId, (store, sched) -> {
            StoredWorkflowDefinitionWithProject def = this.rm.getWorkflowDetailsById(sched.getWorkflowDefinitionId());
            Scheduler sr = this.srm.getScheduler(def);
            boolean useCount = count.isPresent();
            int remaining = (Integer)count.or((Object)0);
            ArrayList<Instant> instants = new ArrayList<Instant>();
            Instant time = sr.getFirstScheduleTime(fromTime).getTime();
            while (time.isBefore(sched.getNextScheduleTime())) {
                if (useCount) {
                    if (remaining <= 0) break;
                    --remaining;
                }
                instants.add(time);
                time = sr.nextScheduleTime(time).getTime();
            }
            if (useCount && remaining > 0) {
                throw new IllegalArgumentException(String.format(Locale.ENGLISH, "count is set to %d but there are only %d attempts until the next schedule time", count.get(), (Integer)count.get() - remaining));
            }
            for (Instant instant : instants) {
                try {
                    ss.getAttemptByName(def.getProject().getId(), def.getName(), instant, attemptName);
                    throw new ResourceConflictException(String.format(Locale.ENGLISH, "Attempt of project id=%d workflow=%s instant=%s attempt name=%s already exists", def.getProject().getId(), def.getName(), instant, attemptName));
                }
                catch (ResourceNotFoundException resourceNotFoundException) {
                }
            }
            return this.workflowExecutor.submitTransaction(siteId, submitter -> {
                ImmutableList.Builder attempts = ImmutableList.builder();
                Optional lastAttempt = Optional.absent();
                for (Instant instant : instants) {
                    if (dryRun) {
                        attempts.add((Object)StoredSessionAttemptWithSession.dryRunDummy(siteId, Session.of(def.getProject().getId(), def.getName(), instant), ImmutableStoredSessionAttempt.builder().retryAttemptName((Optional<String>)Optional.of((Object)attemptName)).workflowDefinitionId((Optional<Long>)Optional.of((Object)def.getId())).timeZone(def.getTimeZone()).id(0L).index(0).params(def.getConfig().getFactory().create()).stateFlags(AttemptStateFlags.empty()).sessionId(0L).createdAt(Instant.now()).finishedAt((Optional<Instant>)Optional.absent()).build()));
                        continue;
                    }
                    Optional<Instant> lastExecutedSessionTime = lastAttempt.transform(a -> a.getSession().getSessionTime());
                    if (!lastExecutedSessionTime.isPresent()) {
                        lastExecutedSessionTime = submitter.getLastExecutedSessionTime(sched.getProjectId(), sched.getWorkflowName(), instant);
                    }
                    AttemptRequest ar = this.newAttemptRequest(def, ScheduleTime.of((Instant)instant, (Instant)sched.getNextScheduleTime()), (Optional<String>)Optional.of((Object)attemptName), lastExecutedSessionTime);
                    StoredSessionAttemptWithSession attempt = submitter.submitDelayedAttempt(ar, (Optional<Long>)lastAttempt.transform(a -> a.getSessionId()));
                    lastAttempt = Optional.of((Object)attempt);
                    attempts.add((Object)attempt);
                }
                return attempts.build();
            });
        });
    }

    public void runDelayedAttempt(DelayedAttemptControlStore control, StoredDelayedSessionAttempt delayedAttempt) {
        try {
            control.lockSessionOfAttempt(delayedAttempt.getAttemptId(), (sessionControlStore, storedAttemptWithSession) -> {
                if (!storedAttemptWithSession.getWorkflowDefinitionId().isPresent()) {
                    throw new ResourceNotFoundException("Delayed attempt must have a stored workflow");
                }
                StoredWorkflowDefinitionWithProject def = this.rm.getProjectStore(storedAttemptWithSession.getSiteId()).getWorkflowDefinitionById((Long)storedAttemptWithSession.getWorkflowDefinitionId().get());
                this.workflowExecutor.storeTasks(sessionControlStore, storedAttemptWithSession, def, (List<ResumingTask>)ImmutableList.of(), (List<SessionMonitor>)ImmutableList.of());
                return true;
            });
        }
        catch (ResourceConflictException ex) {
            logger.warn("Delayed attempt conflicted: {}", (Object)delayedAttempt, (Object)ex);
        }
        catch (ResourceNotFoundException ex) {
            logger.warn("Invalid delayed attempt: {}", (Object)delayedAttempt, (Object)ex);
        }
        catch (ResourceLimitExceededException ex) {
            this.tm.reset();
            logger.warn("Failed to start delayed attempt Due to too many active tasks. Will be retried after 5 minutes.", (Throwable)ex);
            control.delayDelayedAttempt(delayedAttempt.getAttemptId(), Instant.now().plusSeconds(300L));
            return;
        }
        control.completeDelayedAttempt(delayedAttempt.getAttemptId());
    }

    private AttemptRequest newAttemptRequest(StoredWorkflowDefinitionWithProject def, ScheduleTime time, Optional<String> retryAttemptName, Optional<Instant> lastExecutedSessionTime) {
        return this.attemptBuilder.buildFromStoredWorkflow(def, this.cf.create(), time, retryAttemptName, (Optional<Long>)Optional.absent(), (List<Long>)ImmutableList.of(), lastExecutedSessionTime);
    }
}

