/*
 * Decompiled with CFR 0.152.
 */
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.jdbc.JdbcRunner;
import com.github.kagkarlsson.jdbc.ResultSetMapper;
import com.github.kagkarlsson.jdbc.SQLRuntimeException;
import com.github.kagkarlsson.scheduler.SchedulerName;
import com.github.kagkarlsson.scheduler.Serializer;
import com.github.kagkarlsson.scheduler.StringUtils;
import com.github.kagkarlsson.scheduler.TaskRepository;
import com.github.kagkarlsson.scheduler.TaskResolver;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcTaskRepository
implements TaskRepository {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcTaskRepository.class);
    private static final int MAX_DUE_RESULTS = 10000;
    private final TaskResolver taskResolver;
    private final SchedulerName schedulerSchedulerName;
    private final JdbcRunner jdbcRunner;
    private final Serializer serializer;

    public JdbcTaskRepository(DataSource dataSource, TaskResolver taskResolver, SchedulerName schedulerSchedulerName) {
        this(dataSource, taskResolver, schedulerSchedulerName, Serializer.DEFAULT_JAVA_SERIALIZER);
    }

    public JdbcTaskRepository(DataSource dataSource, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, Serializer serializer) {
        this.taskResolver = taskResolver;
        this.schedulerSchedulerName = schedulerSchedulerName;
        this.jdbcRunner = new JdbcRunner(dataSource);
        this.serializer = serializer;
    }

    @Override
    public boolean createIfNotExists(Execution execution) {
        try {
            Optional<Execution> existingExecution = this.getExecution(execution.taskInstance);
            if (existingExecution.isPresent()) {
                LOG.debug("Execution not created, it already exists. Due: {}", (Object)existingExecution.get().executionTime);
                return false;
            }
            this.jdbcRunner.execute("insert into scheduled_tasks(task_name, task_instance, task_data, execution_time, picked, version) values(?, ?, ?, ?, ?, ?)", p -> {
                p.setString(1, execution.taskInstance.getTaskName());
                p.setString(2, execution.taskInstance.getId());
                p.setObject(3, this.serializer.serialize(execution.taskInstance.getData()));
                p.setTimestamp(4, Timestamp.from(execution.executionTime));
                p.setBoolean(5, false);
                p.setLong(6, 1L);
            });
            return true;
        }
        catch (SQLRuntimeException e) {
            LOG.debug("Exception when inserting execution. Assuming it to be a constraint violation.", (Throwable)e);
            Optional<Execution> existingExecution = this.getExecution(execution.taskInstance);
            if (!existingExecution.isPresent()) {
                throw new RuntimeException("Failed to add new execution.", e);
            }
            LOG.debug("Execution not created, another thread created it.");
            return false;
        }
    }

    @Override
    public List<Execution> getDue(Instant now) {
        return this.getDue(now, 10000);
    }

    public List<Execution> getDue(Instant now, int limit) {
        return (List)this.jdbcRunner.query("select * from scheduled_tasks where picked = ? and execution_time <= ? order by execution_time asc", p -> {
            p.setBoolean(1, false);
            p.setTimestamp(2, Timestamp.from(now));
            p.setMaxRows(limit);
        }, (ResultSetMapper)new ExecutionResultSetMapper());
    }

    @Override
    public void remove(Execution execution) {
        int removed = this.jdbcRunner.execute("delete from scheduled_tasks where task_name = ? and task_instance = ? and version = ?", ps -> {
            ps.setString(1, execution.taskInstance.getTaskName());
            ps.setString(2, execution.taskInstance.getId());
            ps.setLong(3, execution.version);
        });
        if (removed != 1) {
            throw new RuntimeException("Expected one execution to be removed, but removed " + removed + ". Indicates a bug.");
        }
    }

    @Override
    public void reschedule(Execution execution, Instant nextExecutionTime, Instant lastSuccess, Instant lastFailure) {
        this.rescheduleInternal(execution, nextExecutionTime, null, lastSuccess, lastFailure);
    }

    @Override
    public void reschedule(Execution execution, Instant nextExecutionTime, Object newData, Instant lastSuccess, Instant lastFailure) {
        this.rescheduleInternal(execution, nextExecutionTime, new NewData(newData), lastSuccess, lastFailure);
    }

    private void rescheduleInternal(Execution execution, Instant nextExecutionTime, NewData newData, Instant lastSuccess, Instant lastFailure) {
        int updated = this.jdbcRunner.execute("update scheduled_tasks set picked = ?, picked_by = ?, last_heartbeat = ?, last_success = ?, last_failure = ?, execution_time = ?, " + (newData != null ? "task_data = ?, " : "") + "version = version + 1 where task_name = ? and task_instance = ? and version = ?", ps -> {
            int index = 1;
            ps.setBoolean(index++, false);
            ps.setString(index++, null);
            ps.setTimestamp(index++, null);
            ps.setTimestamp(index++, Optional.ofNullable(lastSuccess).map(Timestamp::from).orElse(null));
            ps.setTimestamp(index++, Optional.ofNullable(lastFailure).map(Timestamp::from).orElse(null));
            ps.setTimestamp(index++, Timestamp.from(nextExecutionTime));
            if (newData != null) {
                ps.setObject(index++, this.serializer.serialize(newData.data));
            }
            ps.setString(index++, execution.taskInstance.getTaskName());
            ps.setString(index++, execution.taskInstance.getId());
            ps.setLong(index++, execution.version);
        });
        if (updated != 1) {
            throw new RuntimeException("Expected one execution to be updated, but updated " + updated + ". Indicates a bug.");
        }
    }

    @Override
    public Optional<Execution> pick(Execution e, Instant timePicked) {
        int updated = this.jdbcRunner.execute("update scheduled_tasks set picked = ?, picked_by = ?, last_heartbeat = ?, version = version + 1 where picked = ? and task_name = ? and task_instance = ? and version = ?", ps -> {
            ps.setBoolean(1, true);
            ps.setString(2, StringUtils.truncate(this.schedulerSchedulerName.getName(), 50));
            ps.setTimestamp(3, Timestamp.from(timePicked));
            ps.setBoolean(4, false);
            ps.setString(5, e.taskInstance.getTaskName());
            ps.setString(6, e.taskInstance.getId());
            ps.setLong(7, e.version);
        });
        if (updated == 0) {
            LOG.trace("Failed to pick execution. It must have been picked by another scheduler.", (Object)e);
            return Optional.empty();
        }
        if (updated == 1) {
            Optional<Execution> pickedExecution = this.getExecution(e.taskInstance);
            if (!pickedExecution.isPresent()) {
                throw new IllegalStateException("Unable to find picked execution. Must have been deleted by another thread. Indicates a bug.");
            }
            if (!pickedExecution.get().isPicked()) {
                throw new IllegalStateException("Picked execution does not have expected state in database: " + pickedExecution.get());
            }
            return pickedExecution;
        }
        throw new IllegalStateException("Updated multiple rows when picking single execution. Should never happen since name and id is primary key. Execution: " + e);
    }

    @Override
    public List<Execution> getOldExecutions(Instant olderThan) {
        return (List)this.jdbcRunner.query("select * from scheduled_tasks where picked = ? and last_heartbeat <= ? order by last_heartbeat asc", p -> {
            p.setBoolean(1, true);
            p.setTimestamp(2, Timestamp.from(olderThan));
        }, (ResultSetMapper)new ExecutionResultSetMapper());
    }

    @Override
    public void updateHeartbeat(Execution e, Instant newHeartbeat) {
        int updated = this.jdbcRunner.execute("update scheduled_tasks set last_heartbeat = ? where task_name = ? and task_instance = ? and version = ?", ps -> {
            ps.setTimestamp(1, Timestamp.from(newHeartbeat));
            ps.setString(2, e.taskInstance.getTaskName());
            ps.setString(3, e.taskInstance.getId());
            ps.setLong(4, e.version);
        });
        if (updated == 0) {
            LOG.trace("Did not update heartbeat. Execution must have been removed or rescheduled.", (Object)e);
        } else {
            if (updated > 1) {
                throw new IllegalStateException("Updated multiple rows updating heartbeat for execution. Should never happen since name and id is primary key. Execution: " + e);
            }
            LOG.debug("Updated heartbeat for execution: " + e);
        }
    }

    @Override
    public List<Execution> getExecutionsFailingLongerThan(Duration interval) {
        return (List)this.jdbcRunner.query("select * from scheduled_tasks where \t(last_success is null and last_failure is not null)\tor (last_failure is not null and last_success < ?)", p -> p.setTimestamp(1, Timestamp.from(Instant.now().minus(interval))), (ResultSetMapper)new ExecutionResultSetMapper());
    }

    public Optional<Execution> getExecution(TaskInstance taskInstance) {
        return this.getExecution(taskInstance.getTaskName(), taskInstance.getId());
    }

    @Override
    public Optional<Execution> getExecution(String taskName, String taskInstanceId) {
        List executions = (List)this.jdbcRunner.query("select * from scheduled_tasks where task_name = ? and task_instance = ?", p -> {
            p.setString(1, taskName);
            p.setString(2, taskInstanceId);
        }, (ResultSetMapper)new ExecutionResultSetMapper());
        if (executions.size() > 1) {
            throw new RuntimeException(String.format("Found more than one matching execution for task name/id combination: '%s'/'%s'", taskName, taskInstanceId));
        }
        return executions.size() == 1 ? Optional.ofNullable(executions.get(0)) : Optional.empty();
    }

    private static <T> Supplier<T> memoize(final Supplier<T> original) {
        return new Supplier<T>(){
            Supplier<T> delegate = this::firstTime;
            boolean initialized;

            @Override
            public T get() {
                return this.delegate.get();
            }

            private synchronized T firstTime() {
                if (!this.initialized) {
                    Object value = original.get();
                    this.delegate = () -> value;
                    this.initialized = true;
                }
                return this.delegate.get();
            }
        };
    }

    private static class NewData {
        private final Object data;

        NewData(Object data) {
            this.data = data;
        }
    }

    private class ExecutionResultSetMapper
    implements ResultSetMapper<List<Execution>> {
        private ExecutionResultSetMapper() {
        }

        public List<Execution> map(ResultSet rs) throws SQLException {
            ArrayList<Execution> executions = new ArrayList<Execution>();
            while (rs.next()) {
                String taskName = rs.getString("task_name");
                Optional<Task> task = JdbcTaskRepository.this.taskResolver.resolve(taskName);
                if (!task.isPresent()) {
                    LOG.error("Failed to find implementation for task with name '{}'. Either delete the execution from the databaser, or add an implementation for it.", (Object)taskName);
                    continue;
                }
                String instanceId = rs.getString("task_instance");
                byte[] data = rs.getBytes("task_data");
                Instant executionTime = rs.getTimestamp("execution_time").toInstant();
                boolean picked = rs.getBoolean("picked");
                String pickedBy = rs.getString("picked_by");
                Instant lastSuccess = Optional.ofNullable(rs.getTimestamp("last_success")).map(Timestamp::toInstant).orElse(null);
                Instant lastFailure = Optional.ofNullable(rs.getTimestamp("last_failure")).map(Timestamp::toInstant).orElse(null);
                Instant lastHeartbeat = Optional.ofNullable(rs.getTimestamp("last_heartbeat")).map(Timestamp::toInstant).orElse(null);
                long version = rs.getLong("version");
                Supplier dataSupplier = JdbcTaskRepository.memoize(() -> JdbcTaskRepository.this.serializer.deserialize(((Task)task.get()).getDataClass(), data));
                executions.add(new Execution(executionTime, new TaskInstance(taskName, instanceId, dataSupplier), picked, pickedBy, lastSuccess, lastFailure, lastHeartbeat, version));
            }
            return executions;
        }
    }
}

