/*
 * Decompiled with CFR 0.152.
 */
package io.digdag.core.database;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import io.digdag.core.database.BasicDatabaseStoreManager;
import io.digdag.core.database.ConfigMapper;
import io.digdag.core.database.DatabaseConfig;
import io.digdag.core.database.TransactionManager;
import io.digdag.core.repository.ResourceConflictException;
import io.digdag.core.repository.ResourceLimitExceededException;
import io.digdag.core.repository.ResourceNotFoundException;
import io.digdag.core.schedule.ImmutableStoredSchedule;
import io.digdag.core.schedule.ScheduleControlStore;
import io.digdag.core.schedule.ScheduleStore;
import io.digdag.core.schedule.ScheduleStoreManager;
import io.digdag.core.schedule.StoredSchedule;
import io.digdag.spi.ScheduleTime;
import io.digdag.spi.ac.AccessController;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.sqlobject.customizers.Define;
import org.skife.jdbi.v2.sqlobject.stringtemplate.UseStringTemplate3StatementLocator;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

public class DatabaseScheduleStoreManager
extends BasicDatabaseStoreManager<Dao>
implements ScheduleStoreManager {
    @Inject
    public DatabaseScheduleStoreManager(TransactionManager transactionManager, ConfigMapper cfm, DatabaseConfig config) {
        super(config.getType(), DatabaseScheduleStoreManager.dao(config.getType()), transactionManager, cfm);
    }

    private static Class<? extends Dao> dao(String type) {
        switch (type) {
            case "postgresql": {
                return PgDao.class;
            }
            case "h2": {
                return H2Dao.class;
            }
        }
        throw new IllegalArgumentException("Unknown database type: " + type);
    }

    @Override
    public ScheduleStore getScheduleStore(int siteId) {
        return new DatabaseScheduleStore(siteId);
    }

    @Override
    public int lockReadySchedules(Instant currentTime, int limit, ScheduleStoreManager.ScheduleAction func) {
        ArrayList exceptions = new ArrayList();
        long count = this.transaction((handle, dao) -> {
            Stream<Object> schedStream = dao instanceof PgDao ? ((PgDao)dao).lockReadySchedulesSkipLocked(currentTime.getEpochSecond(), limit).stream() : dao.lockReadyScheduleIds(currentTime.getEpochSecond(), limit).stream().map(dao::getScheduleByIdInternal);
            return schedStream.mapToInt(sched -> {
                try {
                    func.schedule(new DatabaseScheduleControlStore(handle), (StoredSchedule)sched);
                }
                catch (RuntimeException ex) {
                    exceptions.add(ex);
                }
                return 1;
            }).sum();
        }).intValue();
        if (!exceptions.isEmpty()) {
            RuntimeException first = (RuntimeException)exceptions.get(0);
            for (RuntimeException ex : exceptions.subList(1, exceptions.size())) {
                first.addSuppressed(ex);
            }
            throw first;
        }
        return (int)count;
    }

    static class StoredScheduleMapper
    implements ResultSetMapper<StoredSchedule> {
        private final ConfigMapper cfm;

        public StoredScheduleMapper(ConfigMapper cfm) {
            this.cfm = cfm;
        }

        public StoredSchedule map(int index, ResultSet r, StatementContext ctx) throws SQLException {
            return ImmutableStoredSchedule.builder().id(r.getInt("id")).projectId(r.getInt("project_id")).workflowDefinitionId(r.getLong("workflow_definition_id")).nextRunTime(Instant.ofEpochSecond(r.getLong("next_run_time"))).nextScheduleTime(Instant.ofEpochSecond(r.getLong("next_schedule_time"))).workflowName(r.getString("name")).createdAt(BasicDatabaseStoreManager.getTimestampInstant(r, "created_at")).updatedAt(BasicDatabaseStoreManager.getTimestampInstant(r, "updated_at")).disabledAt(BasicDatabaseStoreManager.getOptionalTimestampInstant(r, "disabled_at")).lastSessionTime((Optional<Instant>)BasicDatabaseStoreManager.getOptionalLong(r, "last_session_time").transform(Instant::ofEpochSecond)).build();
        }
    }

    public static interface Dao {
        @SqlQuery(value="select s.*, wd.name as name from schedules s join workflow_definitions wd on wd.id = s.workflow_definition_id where s.id = :schedId")
        public StoredSchedule getScheduleByIdInternal(@Bind(value="schedId") int var1);

        @SqlQuery(value="select s.*, wd.name as name from schedules s join workflow_definitions wd on wd.id = s.workflow_definition_id join projects proj on proj.id = s.project_id where exists (select p.* from projects p where p.id = s.project_id and p.site_id = :siteId) and s.id > :lastId and <acFilter> order by s.id asc limit :limit")
        public List<StoredSchedule> getSchedules(@Bind(value="siteId") int var1, @Bind(value="limit") int var2, @Bind(value="lastId") int var3, @Define(value="acFilter") String var4);

        @SqlQuery(value="select s.*, wd.name as name from schedules s join workflow_definitions wd on wd.id = s.workflow_definition_id join projects proj on proj.id = s.project_id where s.project_id = :projectId  and exists (select p.* from projects p where p.id = s.project_id and p.site_id = :siteId) and s.id > :lastId and <acFilter> order by s.id asc limit :limit")
        public List<StoredSchedule> getSchedulesByProjectId(@Bind(value="siteId") int var1, @Bind(value="projectId") int var2, @Bind(value="limit") int var3, @Bind(value="lastId") int var4, @Define(value="acFilter") String var5);

        @SqlQuery(value="select s.*, wd.name as name from schedules s join workflow_definitions wd on wd.id = s.workflow_definition_id where s.id = :schedId and exists (select * from projects proj where proj.id = s.project_id and proj.site_id = :siteId)")
        public StoredSchedule getScheduleById(@Bind(value="siteId") int var1, @Bind(value="schedId") int var2);

        @SqlQuery(value="select s.*, wd.name as name from schedules s join workflow_definitions wd on wd.id = s.workflow_definition_id where wd.name = :workflowName and s.project_id = :projectId and exists (select * from projects proj where proj.id = s.project_id and proj.site_id = :siteId)")
        public StoredSchedule getScheduleByProjectIdAndWorkflowName(@Bind(value="siteId") int var1, @Bind(value="projectId") int var2, @Bind(value="workflowName") String var3);

        @SqlQuery(value="select id from schedules where next_run_time \\<= :currentTime and disabled_at is null limit :limit for update")
        public List<Integer> lockReadyScheduleIds(@Bind(value="currentTime") long var1, @Bind(value="limit") int var3);

        @SqlQuery(value="select * from schedules where id = :id for update")
        public int lockScheduleById(@Bind(value="id") long var1);

        @SqlUpdate(value="update schedules set next_run_time = :nextRunTime, next_schedule_time = :nextScheduleTime, updated_at = now() where id = :id")
        public int updateNextScheduleTime(@Bind(value="id") int var1, @Bind(value="nextRunTime") long var2, @Bind(value="nextScheduleTime") long var4);

        @SqlUpdate(value="update schedules set next_run_time = :nextRunTime, next_schedule_time = :nextScheduleTime, last_session_time = :lastSessionTime, updated_at = now() where id = :id")
        public int updateNextScheduleTime(@Bind(value="id") int var1, @Bind(value="nextRunTime") long var2, @Bind(value="nextScheduleTime") long var4, @Bind(value="lastSessionTime") long var6);

        @SqlUpdate(value="update schedules set disabled_at = now(), updated_at = now() where id = :id")
        public int disableSchedule(@Bind(value="id") int var1);

        @SqlUpdate(value="update schedules set disabled_at = null, updated_at = now() where id = :id")
        public int enableSchedule(@Bind(value="id") int var1);
    }

    @UseStringTemplate3StatementLocator
    public static interface PgDao
    extends Dao {
        @SqlQuery(value="select s.*, wd.name as name from schedules s join workflow_definitions wd on wd.id = s.workflow_definition_id where s.next_run_time \\<= :currentTime and s.disabled_at is null limit :limit for update of s skip locked")
        public List<StoredSchedule> lockReadySchedulesSkipLocked(@Bind(value="currentTime") long var1, @Bind(value="limit") int var3);
    }

    @UseStringTemplate3StatementLocator
    public static interface H2Dao
    extends Dao {
    }

    private static class DatabaseScheduleControlStore
    implements ScheduleControlStore {
        private final Handle handle;
        private final Dao dao;

        public DatabaseScheduleControlStore(Handle handle) {
            this.handle = handle;
            this.dao = (Dao)handle.attach(Dao.class);
        }

        @Override
        public void updateNextScheduleTime(int schedId, ScheduleTime nextTime) throws ResourceNotFoundException {
            int n = this.dao.updateNextScheduleTime(schedId, nextTime.getRunTime().getEpochSecond(), nextTime.getTime().getEpochSecond());
            assert (n >= 0);
            if (n <= 0) {
                throw new ResourceNotFoundException("schedule id=" + schedId);
            }
        }

        @Override
        public void updateNextScheduleTimeAndLastSessionTime(int schedId, ScheduleTime nextTime, Instant lastSessionTime) throws ResourceNotFoundException {
            int n = this.dao.updateNextScheduleTime(schedId, nextTime.getRunTime().getEpochSecond(), nextTime.getTime().getEpochSecond(), lastSessionTime.getEpochSecond());
            assert (n >= 0);
            if (n <= 0) {
                throw new ResourceNotFoundException("schedule id=" + schedId);
            }
        }

        @Override
        public boolean disableSchedule(int schedId) {
            int n = this.dao.disableSchedule(schedId);
            return n > 0;
        }

        @Override
        public boolean enableSchedule(int schedId) {
            int n = this.dao.enableSchedule(schedId);
            return n > 0;
        }

        @Override
        public StoredSchedule getScheduleById(int schedId) {
            return this.dao.getScheduleByIdInternal(schedId);
        }
    }

    private class DatabaseScheduleStore
    implements ScheduleStore {
        private final int siteId;

        public DatabaseScheduleStore(int siteId) {
            this.siteId = siteId;
        }

        @Override
        public List<StoredSchedule> getSchedules(int pageSize, Optional<Integer> lastId, AccessController.ListFilter acFilter) {
            return DatabaseScheduleStoreManager.this.autoCommit((handle, dao) -> dao.getSchedules(this.siteId, pageSize, (Integer)lastId.or((Object)0), acFilter.getSql()));
        }

        @Override
        public StoredSchedule getScheduleById(int schedId) throws ResourceNotFoundException {
            return DatabaseScheduleStoreManager.this.requiredResource((handle, dao) -> dao.getScheduleById(this.siteId, schedId), "schedule id=%d", new Object[]{schedId});
        }

        @Override
        public List<StoredSchedule> getSchedulesByProjectId(int projectId, int pageSize, Optional<Integer> lastId, AccessController.ListFilter acFilter) {
            return DatabaseScheduleStoreManager.this.autoCommit((handle, dao) -> dao.getSchedulesByProjectId(this.siteId, projectId, pageSize, (Integer)lastId.or((Object)0), acFilter.getSql()));
        }

        @Override
        public StoredSchedule getScheduleByProjectIdAndWorkflowName(int projectId, String workflowName) throws ResourceNotFoundException {
            return DatabaseScheduleStoreManager.this.requiredResource((handle, dao) -> dao.getScheduleByProjectIdAndWorkflowName(this.siteId, projectId, workflowName), "schedule projectId=%d workflowName", new Object[]{projectId, workflowName});
        }

        private <T, E extends Exception> T combinedLockScheduleById(int schedId, ScheduleCombinedLockAction<T, E> func, Class<E> exClass) throws ResourceNotFoundException, ResourceConflictException, E {
            return (T)DatabaseScheduleStoreManager.this.transaction((handle, dao) -> {
                if (dao.lockScheduleById(schedId) == 0) {
                    throw new ResourceNotFoundException("schedule id=" + schedId);
                }
                StoredSchedule schedule = DatabaseScheduleStoreManager.this.requiredResource(dao.getScheduleByIdInternal(schedId), "schedule id=%d", schedId);
                return func.call(new DatabaseScheduleControlStore(handle), schedule);
            }, ResourceNotFoundException.class, ResourceConflictException.class, exClass);
        }

        @Override
        public <T> T updateScheduleById(int schedId, ScheduleStore.ScheduleUpdateAction<T> func) throws ResourceNotFoundException, ResourceConflictException {
            return (T)this.combinedLockScheduleById(schedId, (store, sched) -> func.call(store, sched), RuntimeException.class);
        }

        @Override
        public <T> T lockScheduleById(int schedId, ScheduleStore.ScheduleLockAction<T> func) throws ResourceNotFoundException, ResourceConflictException, ResourceLimitExceededException {
            return (T)this.combinedLockScheduleById(schedId, (store, sched) -> func.call(store, sched), ResourceLimitExceededException.class);
        }
    }

    private static interface ScheduleCombinedLockAction<T, E extends Exception> {
        public T call(ScheduleControlStore var1, StoredSchedule var2) throws ResourceNotFoundException, ResourceConflictException, E;
    }
}

