/*
 * Decompiled with CFR 0.152.
 */
package io.digdag.core.database;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.digdag.core.ErrorReporter;
import io.digdag.core.database.BasicDatabaseStoreManager;
import io.digdag.core.database.ConfigMapper;
import io.digdag.core.database.DatabaseConfig;
import io.digdag.core.database.DatabaseTaskQueueConfig;
import io.digdag.core.database.LocalLockMap;
import io.digdag.core.database.TransactionManager;
import io.digdag.core.log.LogMarkers;
import io.digdag.core.repository.ResourceConflictException;
import io.digdag.spi.ImmutableTaskQueueLock;
import io.digdag.spi.TaskConflictException;
import io.digdag.spi.TaskNotFoundException;
import io.digdag.spi.TaskQueueLock;
import io.digdag.spi.TaskQueueRequest;
import io.digdag.spi.TaskQueueServer;
import io.digdag.spi.metrics.DigdagMetrics;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.GetGeneratedKeys;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

public class DatabaseTaskQueueServer
extends BasicDatabaseStoreManager<Dao>
implements TaskQueueServer {
    private final DatabaseTaskQueueConfig queueConfig;
    private final ObjectMapper taskObjectMapper;
    private final int expireLockInterval;
    private final LocalLockMap localLockMap = new LocalLockMap();
    private final ScheduledExecutorService expireExecutor;
    private final TransactionManager transactionManager;
    @Inject(optional=true)
    private ErrorReporter errorReporter = ErrorReporter.empty();
    @Inject
    private DigdagMetrics metrics;
    private final Object localTaskNoticeHelper = new Object();

    @Inject
    public DatabaseTaskQueueServer(DatabaseConfig config, TransactionManager tm, ConfigMapper cfm, DatabaseTaskQueueConfig queueConfig, ObjectMapper taskObjectMapper) {
        super(config.getType(), Dao.class, tm, cfm);
        this.queueConfig = queueConfig;
        this.taskObjectMapper = taskObjectMapper;
        this.transactionManager = tm;
        this.expireLockInterval = config.getExpireLockInterval();
        this.expireExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lock-expire-%d").build());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @SuppressFBWarnings(value={"NN_NAKED_NOTIFY"})
    public void interruptLocalWait() {
        Object object = this.localTaskNoticeHelper;
        synchronized (object) {
            this.localTaskNoticeHelper.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sleepForEnqueue(long maxSleepMillis) {
        Object object = this.localTaskNoticeHelper;
        synchronized (object) {
            try {
                this.localTaskNoticeHelper.wait(maxSleepMillis);
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    @PostConstruct
    public void start() throws Exception {
        this.expireExecutor.scheduleWithFixedDelay(() -> this.transactionManager.begin(() -> {
            this.expireLocks();
            return null;
        }), this.expireLockInterval, this.expireLockInterval, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void shutdown() {
        this.expireExecutor.shutdown();
    }

    private boolean isEmbededDatabase() {
        switch (this.databaseType) {
            case "h2": {
                return true;
            }
        }
        return false;
    }

    private String statementUnixTimestampSql() {
        return "extract(epoch from now())";
    }

    public void enqueueDefaultQueueTask(int siteId, TaskQueueRequest request) throws TaskConflictException {
        try {
            this.enqueue(siteId, null, request.getPriority(), request.getUniqueName(), (byte[])request.getData().orNull());
        }
        catch (ResourceConflictException ex) {
            throw new TaskConflictException((Throwable)ex);
        }
    }

    public void enqueueQueueBoundTask(int queueId, TaskQueueRequest request) throws TaskConflictException {
        try {
            Integer sharedAgentSiteId = this.autoCommit((handle, dao) -> dao.getSharedSiteId(queueId));
            this.enqueue(sharedAgentSiteId, queueId, request.getPriority(), request.getUniqueName(), (byte[])request.getData().orNull());
        }
        catch (ResourceConflictException ex) {
            throw new TaskConflictException((Throwable)ex);
        }
    }

    private long enqueue(@Nullable Integer siteId, @Nullable Integer queueId, int priority, String uniqueName, @Nullable byte[] data) throws ResourceConflictException {
        long id = this.transaction((handle, dao) -> {
            long queuedTaskId = this.catchConflict(() -> dao.insertQueuedTask(siteId, queueId, uniqueName, data), "lock of task name=%s in site id = %d and queue id=%d", uniqueName, siteId, queueId);
            dao.insertQueuedTaskLock(queuedTaskId, siteId, queueId, priority);
            return queuedTaskId;
        }, ResourceConflictException.class);
        this.interruptLocalWait();
        return id;
    }

    private static String formatSharedTaskLockId(long taskLockId) {
        return "s" + Long.toString(taskLockId);
    }

    private static String formatQueueBoundTaskLockId(long taskLockId, int queueId) {
        return "q" + Long.toString(taskLockId) + "." + Integer.toString(queueId);
    }

    private static boolean isSharedTaskLockId(String formatted) {
        return formatted.startsWith("s");
    }

    private static long parseTaskLockId(String formatted) {
        return Long.parseLong(formatted.split("\\.", 2)[0].substring(1));
    }

    private static int parseQueueId(String formatted) {
        String[] fragments = formatted.split("\\.");
        if (fragments.length != 2) {
            throw new IllegalArgumentException("Invalid queue-bound task lock id: " + formatted);
        }
        return Integer.parseInt(fragments[1]);
    }

    public void deleteTask(int siteId, String lockId, String agentId) throws TaskNotFoundException, TaskConflictException {
        long taskLockId = DatabaseTaskQueueServer.parseTaskLockId(lockId);
        this.deleteTask0(siteId, taskLockId, agentId);
    }

    private void deleteTask0(int siteId, long taskLockId, String agentId) throws TaskNotFoundException, TaskConflictException {
        this.transaction((handle, dao) -> {
            int count = dao.deleteQueuedTask(siteId, taskLockId);
            if (count == 0) {
                throw new TaskNotFoundException("Deleting lock does not exist: lock id=" + taskLockId + " site id=" + siteId);
            }
            count = dao.deleteQueuedTaskLock(taskLockId, agentId);
            if (count == 0) {
                throw new TaskConflictException("Deleting lock does not exist or preempted by another agent: lock id=" + taskLockId + " agent id=" + agentId);
            }
            return true;
        }, TaskNotFoundException.class, TaskConflictException.class);
    }

    public boolean forceDeleteTask(String lockId) {
        long taskLockId = DatabaseTaskQueueServer.parseTaskLockId(lockId);
        return this.forceDeleteTask0(taskLockId);
    }

    private boolean forceDeleteTask0(long taskLockId) {
        return this.transaction((handle, dao) -> {
            int taskCount = dao.forceDeleteQueuedTask(taskLockId);
            int lockCount = dao.forceDeleteQueuedTaskLock(taskLockId);
            return taskCount > 0 || lockCount > 0;
        });
    }

    public List<String> taskHeartbeat(int siteId, List<String> lockedIds, String agentId, int lockSeconds) {
        ImmutableList.Builder notFoundList = ImmutableList.builder();
        for (String formatted : lockedIds) {
            boolean success = DatabaseTaskQueueServer.isSharedTaskLockId(formatted) ? this.taskHeartbeat0(siteId, null, DatabaseTaskQueueServer.parseTaskLockId(formatted), agentId, lockSeconds) : this.taskHeartbeat0(siteId, DatabaseTaskQueueServer.parseQueueId(formatted), DatabaseTaskQueueServer.parseTaskLockId(formatted), agentId, lockSeconds);
            if (success) continue;
            notFoundList.add((Object)formatted);
        }
        return notFoundList.build();
    }

    private boolean taskHeartbeat0(int siteId, Integer queueId, long taskLockId, String agentId, int lockSeconds) {
        return this.autoCommit((handle, dao) -> {
            String lockExpireTimeSql = this.isEmbededDatabase() ? Long.toString(Instant.now().getEpochSecond() + (long)lockSeconds) : this.statementUnixTimestampSql() + " + " + Integer.toString(lockSeconds);
            return ((Update)((Update)((Update)((Update)((Update)handle.createStatement("update queued_task_locks set lock_expire_time = " + lockExpireTimeSql + " where id = :id and lock_agent_id = :agentId and coalesce(site_id, (select site_id from queue_settings where id = :queueId)) = :siteId").bind("expireTime", Instant.now().getEpochSecond() + (long)lockSeconds)).bind("id", taskLockId)).bind("agentId", agentId)).bind("queueId", queueId)).bind("siteId", siteId)).execute();
        }) > 0;
    }

    public List<TaskQueueLock> lockSharedAgentTasks(int count, String agentId, int lockSeconds, long maxSleepMillis) {
        List siteIds = this.autoCommit((handle, dao) -> dao.getActiveSiteIdList());
        Collections.shuffle(siteIds);
        Iterator iterator = siteIds.iterator();
        while (iterator.hasNext()) {
            int siteId = (Integer)iterator.next();
            List<Long> taskLockIds = this.tryLockSharedAgentTasks(siteId, count, agentId, lockSeconds);
            if (taskLockIds.isEmpty()) continue;
            ImmutableList.Builder builder = ImmutableList.builder();
            for (long taskLockId : taskLockIds) {
                ImmutableTaskQueueLock data = this.autoCommit((handle, dao) -> dao.getTaskData(taskLockId));
                if (data == null) continue;
                String lockId = DatabaseTaskQueueServer.formatSharedTaskLockId(taskLockId);
                builder.add((Object)data.withLockId(lockId));
            }
            return builder.build();
        }
        if (maxSleepMillis >= 0L) {
            this.sleepForEnqueue(maxSleepMillis);
        }
        return ImmutableList.of();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<Long> tryLockSharedAgentTasks(int siteId, int count, String agentId, int lockSeconds) {
        int siteMaxConcurrency = this.queueConfig.getSiteMaxConcurrency(siteId);
        try {
            if (!this.localLockMap.tryLock(siteId, 500L)) {
                return ImmutableList.of();
            }
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            return ImmutableList.of();
        }
        try {
            if (this.isEmbededDatabase()) {
                List list = this.transaction((handle, dao) -> {
                    List taskLockIds = ((Query)((Query)((Query)handle.createQuery("select id from queued_task_locks where lock_expire_time is null and site_id = :siteId and not exists (select * from (select queue_id, count(*) as count from queued_task_locks where lock_expire_time is not null and site_id = :siteId group by queue_id) runnings join queues on queues.id = runnings.queue_id where runnings.count >= queues.max_concurrency and runnings.queue_id = queued_task_locks.queue_id) and not exists (select count(*) from queued_task_locks where lock_expire_time is not null and site_id = :siteId having count(*) >= :siteMaxConcurrency) order by queue_id, priority desc, id limit :limit").bind("siteId", siteId)).bind("siteMaxConcurrency", siteMaxConcurrency)).bind("limit", count)).mapTo(Long.TYPE).list();
                    ((Update)((Update)handle.createStatement("update queued_task_locks set lock_expire_time = :expireTime, lock_agent_id = :agentId where id in (" + taskLockIds.stream().map(it -> Long.toString(it)).collect(Collectors.joining(", ")) + ")").bind("expireTime", Instant.now().getEpochSecond() + (long)lockSeconds)).bind("agentId", agentId)).execute();
                    return taskLockIds;
                });
                return list;
            }
            List list = this.autoCommit((handle, dao) -> ((Query)((Query)((Query)((Query)((Query)handle.createQuery("select lock_shared_tasks(:siteId, :siteMaxConcurrency, :limit, :lockExpireSeconds, :agentId)").bind("siteId", siteId)).bind("siteMaxConcurrency", siteMaxConcurrency)).bind("limit", count)).bind("lockExpireSeconds", lockSeconds)).bind("agentId", agentId)).mapTo(Long.TYPE).list());
            return list;
        }
        finally {
            this.localLockMap.unlock(siteId);
        }
    }

    @VisibleForTesting
    void expireLocks() {
        try {
            int c = this.autoCommit((handle, dao) -> {
                if (this.isEmbededDatabase()) {
                    return ((Update)handle.createStatement("update queued_task_locks set lock_expire_time = NULL, lock_agent_id = NULL, retry_count = retry_count + 1 where lock_expire_time is not null and lock_expire_time < :expireTime").bind("expireTime", Instant.now().getEpochSecond())).execute();
                }
                return handle.createStatement("update queued_task_locks set lock_expire_time = NULL, lock_agent_id = NULL, retry_count = retry_count + 1 where lock_expire_time is not null and lock_expire_time < " + this.statementUnixTimestampSql()).execute();
            });
            if (c > 0) {
                this.logger.warn("{} task locks are expired. Tasks will be retried.", (Object)c);
            }
        }
        catch (Throwable t) {
            this.logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "An uncaught exception is ignored. This lock expiration thread will be restarted.", t);
            this.errorReporter.reportUncaughtError(t);
            this.metrics.increment(DigdagMetrics.Category.DB, "uncaughtErrors");
        }
    }

    public static interface Dao {
        @SqlQuery(value="select shared_site_id from queues where id = :queueId")
        public Integer getSharedSiteId(@Bind(value="queueId") long var1);

        @SqlQuery(value="with recursive t (site_id) as ((select site_id from queued_task_locks where lock_expire_time is null and site_id is not null order by site_id limit 1) union all select (select site_id from queued_task_locks where lock_expire_time is null and site_id is not null and site_id > t.site_id order by site_id limit 1) from t where t.site_id is not null) select site_id as id from t where site_id is not null")
        public List<Integer> getActiveSiteIdList();

        @SqlUpdate(value="insert into queued_tasks (site_id, queue_id, unique_name, data, created_at) values (:siteId, :queueId, :uniqueName, :data, now())")
        @GetGeneratedKeys
        public long insertQueuedTask(@Bind(value="siteId") Integer var1, @Bind(value="queueId") Integer var2, @Bind(value="uniqueName") String var3, @Bind(value="data") byte[] var4);

        @SqlUpdate(value="insert into queued_task_locks (id, site_id, queue_id, priority) values (:id, :siteId, :queueId, :priority)")
        public void insertQueuedTaskLock(@Bind(value="id") long var1, @Bind(value="siteId") Integer var3, @Bind(value="queueId") Integer var4, @Bind(value="priority") int var5);

        @SqlQuery(value="select unique_name, data from queued_tasks where id = :taskLockId")
        public ImmutableTaskQueueLock getTaskData(@Bind(value="taskLockId") long var1);

        @SqlUpdate(value="delete from queued_task_locks where id = :taskLockId and lock_agent_id = :agentId")
        public int deleteQueuedTaskLock(@Bind(value="taskLockId") long var1, @Bind(value="agentId") String var3);

        @SqlUpdate(value="delete from queued_task_locks where id = :taskLockId")
        public int forceDeleteQueuedTaskLock(@Bind(value="taskLockId") long var1);

        @SqlUpdate(value="delete from queued_tasks where id = :taskLockId and site_id = :siteId")
        public int deleteQueuedTask(@Bind(value="siteId") int var1, @Bind(value="taskLockId") long var2);

        @SqlUpdate(value="delete from queued_tasks where id = :taskLockId")
        public int forceDeleteQueuedTask(@Bind(value="taskLockId") long var1);
    }

    static class ImmutableTaskQueueLockMapper
    implements ResultSetMapper<ImmutableTaskQueueLock> {
        ImmutableTaskQueueLockMapper() {
        }

        public ImmutableTaskQueueLock map(int index, ResultSet r, StatementContext ctx) throws SQLException {
            return ImmutableTaskQueueLock.builder().lockId("").uniqueName(r.getString("unique_name")).data(BasicDatabaseStoreManager.getOptionalBytes(r, "data")).build();
        }
    }
}

