/*
 * Decompiled with CFR 0.152.
 */
package com.github.kagkarlsson.scheduler.jdbc;

import com.github.kagkarlsson.scheduler.Clock;
import com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter;
import com.github.kagkarlsson.scheduler.SchedulerName;
import com.github.kagkarlsson.scheduler.StringUtils;
import com.github.kagkarlsson.scheduler.TaskRepository;
import com.github.kagkarlsson.scheduler.TaskResolver;
import com.github.kagkarlsson.scheduler.exceptions.ExecutionException;
import com.github.kagkarlsson.scheduler.exceptions.FailedToScheduleBatchException;
import com.github.kagkarlsson.scheduler.exceptions.TaskInstanceException;
import com.github.kagkarlsson.scheduler.jdbc.AndCondition;
import com.github.kagkarlsson.scheduler.jdbc.AutodetectJdbcCustomization;
import com.github.kagkarlsson.scheduler.jdbc.JdbcCustomization;
import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepositoryContext;
import com.github.kagkarlsson.scheduler.jdbc.QueryBuilder;
import com.github.kagkarlsson.scheduler.serializer.Serializer;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.ScheduledTaskInstance;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.shaded.jdbc.JdbcRunner;
import com.github.kagkarlsson.shaded.jdbc.ResultSetMapper;
import com.github.kagkarlsson.shaded.jdbc.SQLRuntimeException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcTaskRepository
implements TaskRepository {
    public static final String DEFAULT_TABLE_NAME = "scheduled_tasks";
    private static final Logger LOG = LoggerFactory.getLogger(JdbcTaskRepository.class);
    private final TaskResolver taskResolver;
    private final SchedulerName schedulerSchedulerName;
    private final JdbcRunner jdbcRunner;
    private final Serializer serializer;
    private final String tableName;
    private final JdbcCustomization jdbcCustomization;
    private final boolean orderByPriority;
    private final Clock clock;
    private final String insertQuery;

    public JdbcTaskRepository(DataSource dataSource, boolean commitWhenAutocommitDisabled, String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, boolean orderByPriority, Clock clock) {
        this(dataSource, commitWhenAutocommitDisabled, new AutodetectJdbcCustomization(dataSource), tableName, taskResolver, schedulerSchedulerName, Serializer.DEFAULT_JAVA_SERIALIZER, orderByPriority, clock);
    }

    public JdbcTaskRepository(DataSource dataSource, boolean commitWhenAutocommitDisabled, JdbcCustomization jdbcCustomization, String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, boolean orderByPriority, Clock clock) {
        this(dataSource, commitWhenAutocommitDisabled, jdbcCustomization, tableName, taskResolver, schedulerSchedulerName, Serializer.DEFAULT_JAVA_SERIALIZER, orderByPriority, clock);
    }

    public JdbcTaskRepository(DataSource dataSource, boolean commitWhenAutocommitDisabled, JdbcCustomization jdbcCustomization, String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, Serializer serializer, boolean orderByPriority, Clock clock) {
        this(jdbcCustomization, tableName, taskResolver, schedulerSchedulerName, serializer, new JdbcRunner(dataSource, commitWhenAutocommitDisabled), orderByPriority, clock);
    }

    protected JdbcTaskRepository(JdbcCustomization jdbcCustomization, String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, Serializer serializer, JdbcRunner jdbcRunner, boolean orderByPriority, Clock clock) {
        this.tableName = tableName;
        this.taskResolver = taskResolver;
        this.schedulerSchedulerName = schedulerSchedulerName;
        this.jdbcRunner = jdbcRunner;
        this.serializer = serializer;
        this.jdbcCustomization = jdbcCustomization;
        this.orderByPriority = orderByPriority;
        this.clock = clock;
        this.insertQuery = this.getInsertQuery(tableName);
    }

    public boolean createIfNotExists(SchedulableInstance instance) {
        return this.createIfNotExists(new ScheduledTaskInstance(instance.getTaskInstance(), instance.getNextExecutionTime(this.clock.now())));
    }

    @Override
    public boolean createIfNotExists(ScheduledTaskInstance instance) {
        TaskInstance<?> taskInstance = instance.getTaskInstance();
        try {
            Optional<Execution> existingExecution = this.getExecution(taskInstance);
            if (existingExecution.isPresent()) {
                LOG.debug("Execution not created, it already exists. Due: {}", (Object)existingExecution.get().executionTime);
                return false;
            }
            this.jdbcRunner.execute(this.insertQuery, p -> this.setInsertParameters(instance, p));
            return true;
        }
        catch (SQLRuntimeException e) {
            LOG.debug("Exception when inserting execution. Assuming it to be a constraint violation.", (Throwable)e);
            Optional<Execution> existingExecution = this.getExecution(taskInstance);
            if (existingExecution.isEmpty()) {
                throw new TaskInstanceException("Failed to add new execution.", instance.getTaskName(), instance.getId(), e);
            }
            LOG.debug("Execution not created, another thread created it.");
            return false;
        }
    }

    @Override
    public void createBatch(List<ScheduledTaskInstance> executions) {
        try {
            this.jdbcRunner.executeBatch(this.insertQuery, executions, this::setInsertParameters);
        }
        catch (SQLRuntimeException e) {
            LOG.debug("Failed to create all executions. Some might already exist.", (Throwable)e);
            throw new FailedToScheduleBatchException("Failed to create all executions.", e);
        }
    }

    private void setInsertParameters(ScheduledTaskInstance value, PreparedStatement ps) throws SQLException {
        TaskInstance<?> taskInstance = value.getTaskInstance();
        int index = 1;
        ps.setString(index++, taskInstance.getTaskName());
        ps.setString(index++, taskInstance.getId());
        this.jdbcCustomization.setTaskData(ps, index++, this.serializer.serialize(taskInstance.getData()));
        this.jdbcCustomization.setInstant(ps, index++, value.getExecutionTime());
        ps.setBoolean(index++, false);
        ps.setLong(index++, 1L);
        if (this.orderByPriority) {
            ps.setInt(index, taskInstance.getPriority());
        }
    }

    private String getInsertQuery(String tableName) {
        return "insert into " + tableName + "(task_name, task_instance, task_data, execution_time, picked, version" + (this.orderByPriority ? ", priority" : "") + ") values(?, ?, ?, ?, ?, ? " + (this.orderByPriority ? ", ?" : "") + ")";
    }

    public Instant replace(Execution toBeReplaced, SchedulableInstance newInstance) {
        return this.replace(toBeReplaced, new ScheduledTaskInstance(newInstance.getTaskInstance(), newInstance.getNextExecutionTime(this.clock.now())));
    }

    @Override
    public Instant replace(Execution toBeReplaced, ScheduledTaskInstance newInstance) {
        Instant newExecutionTime = newInstance.getExecutionTime();
        Execution newExecution = new Execution(newExecutionTime, newInstance.getTaskInstance());
        Object newData = newInstance.getTaskInstance().getData();
        int updated = this.jdbcRunner.execute("update " + this.tableName + " set task_name = ?, task_instance = ?, picked = ?, picked_by = ?, last_heartbeat = ?, last_success = ?, last_failure = ?, consecutive_failures = ?, execution_time = ?, task_data = ?, version = 1 where task_name = ? and task_instance = ? and version = ?", ps -> {
            int index = 1;
            ps.setString(index++, newExecution.taskInstance.getTaskName());
            ps.setString(index++, newExecution.taskInstance.getId());
            ps.setBoolean(index++, false);
            ps.setString(index++, null);
            this.jdbcCustomization.setInstant(ps, index++, null);
            this.jdbcCustomization.setInstant(ps, index++, null);
            this.jdbcCustomization.setInstant(ps, index++, null);
            ps.setInt(index++, 0);
            this.jdbcCustomization.setInstant(ps, index++, newExecutionTime);
            this.jdbcCustomization.setTaskData(ps, index++, this.serializer.serialize(newData));
            ps.setString(index++, toBeReplaced.taskInstance.getTaskName());
            ps.setString(index++, toBeReplaced.taskInstance.getId());
            ps.setLong(index, toBeReplaced.version);
        });
        if (updated == 0) {
            throw new IllegalStateException("Failed to replace execution, found none matching " + String.valueOf(toBeReplaced));
        }
        if (updated > 1) {
            LOG.error("Expected one execution to be updated, but updated {}. Indicates a bug. Replaced {} with {}", new Object[]{updated, toBeReplaced.taskInstance, newExecution.taskInstance});
        }
        return newExecutionTime;
    }

    @Override
    public void getScheduledExecutions(ScheduledExecutionsFilter filter, Consumer<Execution> consumer) {
        UnresolvedFilter unresolvedFilter = new UnresolvedFilter(this.taskResolver.getUnresolved());
        QueryBuilder q = this.queryForFilter(filter, this.orderByPriority);
        if (unresolvedFilter.isActive() && !filter.getIncludeUnresolved()) {
            q.andCondition(unresolvedFilter);
        }
        this.jdbcRunner.query(q.getQuery(), q.getPreparedStatementSetter(), new ExecutionResultSetConsumer(consumer, filter.getIncludeUnresolved(), false));
    }

    @Override
    public void getScheduledExecutions(ScheduledExecutionsFilter filter, String taskName, Consumer<Execution> consumer) {
        UnresolvedFilter unresolvedFilter = new UnresolvedFilter(this.taskResolver.getUnresolved());
        QueryBuilder q = this.queryForFilter(filter, this.orderByPriority);
        if (unresolvedFilter.isActive() && !filter.getIncludeUnresolved()) {
            q.andCondition(unresolvedFilter);
        }
        q.andCondition(new TaskCondition(taskName));
        this.jdbcRunner.query(q.getQuery(), q.getPreparedStatementSetter(), new ExecutionResultSetConsumer(consumer, filter.getIncludeUnresolved(), false));
    }

    @Override
    public List<Execution> getDue(Instant now, int limit) {
        LOG.trace("Using generic fetch-then-lock query");
        UnresolvedFilter unresolvedFilter = new UnresolvedFilter(this.taskResolver.getUnresolved());
        String selectDueQuery = this.jdbcCustomization.createSelectDueQuery(this.tableName, limit, unresolvedFilter.andCondition(), this.orderByPriority);
        return this.getExecutions(this.jdbcRunner, selectDueQuery, now, limit, unresolvedFilter);
    }

    @Override
    public List<Execution> lockAndFetchGeneric(Instant now, int limit) {
        return this.jdbcRunner.inTransaction(txRunner -> {
            UnresolvedFilter unresolvedFilter = new UnresolvedFilter(this.taskResolver.getUnresolved());
            String selectForUpdateQuery = this.jdbcCustomization.createGenericSelectForUpdateQuery(this.tableName, limit, unresolvedFilter.andCondition(), this.orderByPriority);
            List<Execution> candidates = this.getExecutions((JdbcRunner)txRunner, selectForUpdateQuery, now, limit, unresolvedFilter);
            if (candidates.isEmpty()) {
                return new ArrayList();
            }
            String pickedBy = StringUtils.truncate(this.schedulerSchedulerName.getName(), 50);
            Instant lastHeartbeat = this.clock.now();
            int[] updated = txRunner.executeBatch("update " + this.tableName + " set picked = ?, picked_by = ?, last_heartbeat = ?, version = version + 1  where task_name = ?  and task_instance = ? ", candidates, (value, ps) -> {
                ps.setBoolean(1, true);
                ps.setString(2, pickedBy);
                this.jdbcCustomization.setInstant(ps, 3, lastHeartbeat);
                ps.setString(4, value.taskInstance.getTaskName());
                ps.setString(5, value.taskInstance.getId());
            });
            int totalUpdated = IntStream.of(updated).sum();
            if (totalUpdated != candidates.size()) {
                LOG.error("Did not update same amount of executions that were locked in the transaction. This might mean some assumption is wrong here, or that transaction is not working. Needs to be investigated. Updated: {}, expected: {}", (Object)totalUpdated, (Object)candidates.size());
                ArrayList<Execution> locked = new ArrayList<Execution>();
                ArrayList<Execution> noLock = new ArrayList<Execution>();
                for (int i = 0; i < candidates.size(); ++i) {
                    if (updated[i] > 1) {
                        LOG.error("Should never happen, indicates a bug.");
                    }
                    if (updated[i] > 0) {
                        locked.add(candidates.get(i));
                        continue;
                    }
                    noLock.add(candidates.get(i));
                }
                String instancesNotLocked = noLock.stream().map(e -> e.taskInstance.toString()).collect(Collectors.joining(","));
                LOG.warn("Returning picked executions for processing. Did not manage to pick executions: {}", (Object)instancesNotLocked);
                return this.updateToPicked(locked, pickedBy, lastHeartbeat);
            }
            return this.updateToPicked(candidates, pickedBy, lastHeartbeat);
        });
    }

    private List<Execution> getExecutions(JdbcRunner jdbcRunner, String query, Instant now, int limit, UnresolvedFilter unresolvedFilter) {
        return jdbcRunner.query(query, p -> {
            int index = 1;
            p.setBoolean(index++, false);
            this.jdbcCustomization.setInstant(p, index++, now);
            unresolvedFilter.setParameters(p, index);
            if (!this.jdbcCustomization.supportsExplicitQueryLimitPart()) {
                p.setMaxRows(limit);
            }
        }, new ExecutionResultSetMapper(false, true));
    }

    private List<Execution> updateToPicked(List<Execution> executions, String pickedBy, Instant lastHeartbeat) {
        return executions.stream().map(old -> old.updateToPicked(pickedBy, lastHeartbeat)).collect(Collectors.toList());
    }

    @Override
    public List<Execution> lockAndGetDue(Instant now, int limit) {
        if (this.jdbcCustomization.supportsSingleStatementLockAndFetch()) {
            LOG.trace("Using single-statement lock-and-fetch");
            return this.jdbcCustomization.lockAndFetchSingleStatement(this.getTaskRespositoryContext(), now, limit, this.orderByPriority);
        }
        if (this.jdbcCustomization.supportsGenericLockAndFetch()) {
            LOG.trace("Using generic transaction-based lock-and-fetch");
            return this.lockAndFetchGeneric(now, limit);
        }
        throw new UnsupportedOperationException("The JdbcCustomization in use for the database indicates that it does not support SELECT FOR UPDATE .. SKIP LOCKED. If it indeed does, please indicate so in the JdbcCustomization.");
    }

    @Override
    public void remove(Execution execution) {
        int removed = this.jdbcRunner.execute("delete from " + this.tableName + " where task_name = ? and task_instance = ? and version = ?", ps -> {
            ps.setString(1, execution.taskInstance.getTaskName());
            ps.setString(2, execution.taskInstance.getId());
            ps.setLong(3, execution.version);
        });
        if (removed != 1) {
            throw new ExecutionException("Expected one execution to be removed, but removed " + removed + ". Indicates a bug.", execution);
        }
    }

    @Override
    public boolean reschedule(Execution execution, Instant nextExecutionTime, Instant lastSuccess, Instant lastFailure, int consecutiveFailures) {
        return this.rescheduleInternal(execution, nextExecutionTime, null, lastSuccess, lastFailure, consecutiveFailures);
    }

    @Override
    public boolean reschedule(Execution execution, Instant nextExecutionTime, Object newData, Instant lastSuccess, Instant lastFailure, int consecutiveFailures) {
        return this.rescheduleInternal(execution, nextExecutionTime, new NewData(newData), lastSuccess, lastFailure, consecutiveFailures);
    }

    private boolean rescheduleInternal(Execution execution, Instant nextExecutionTime, NewData newData, Instant lastSuccess, Instant lastFailure, int consecutiveFailures) {
        int updated = this.jdbcRunner.execute("update " + this.tableName + " set picked = ?, picked_by = ?, last_heartbeat = ?, last_success = ?, last_failure = ?, consecutive_failures = ?, execution_time = ?, " + (newData != null ? "task_data = ?, " : "") + "version = version + 1 where task_name = ? and task_instance = ? and version = ?", ps -> {
            int index = 1;
            ps.setBoolean(index++, false);
            ps.setString(index++, null);
            this.jdbcCustomization.setInstant(ps, index++, null);
            this.jdbcCustomization.setInstant(ps, index++, lastSuccess);
            this.jdbcCustomization.setInstant(ps, index++, lastFailure);
            ps.setInt(index++, consecutiveFailures);
            this.jdbcCustomization.setInstant(ps, index++, nextExecutionTime);
            if (newData != null) {
                this.jdbcCustomization.setTaskData(ps, index++, this.serializer.serialize(newData.data));
            }
            ps.setString(index++, execution.taskInstance.getTaskName());
            ps.setString(index++, execution.taskInstance.getId());
            ps.setLong(index, execution.version);
        });
        if (updated != 1) {
            throw new ExecutionException("Expected one execution to be updated, but updated " + updated + ". Indicates a bug.", execution);
        }
        return true;
    }

    @Override
    public Optional<Execution> pick(Execution e, Instant timePicked) {
        int updated = this.jdbcRunner.execute("update " + this.tableName + " set picked = ?, picked_by = ?, last_heartbeat = ?, version = version + 1 where picked = ? and task_name = ? and task_instance = ? and version = ?", ps -> {
            ps.setBoolean(1, true);
            ps.setString(2, StringUtils.truncate(this.schedulerSchedulerName.getName(), 50));
            this.jdbcCustomization.setInstant(ps, 3, timePicked);
            ps.setBoolean(4, false);
            ps.setString(5, e.taskInstance.getTaskName());
            ps.setString(6, e.taskInstance.getId());
            ps.setLong(7, e.version);
        });
        if (updated == 0) {
            LOG.trace("Failed to pick execution. It must have been picked by another scheduler. {}", (Object)e);
            return Optional.empty();
        }
        if (updated == 1) {
            Optional<Execution> pickedExecution = this.getExecution(e.taskInstance);
            if (pickedExecution.isEmpty()) {
                throw new IllegalStateException("Unable to find picked execution. Must have been deleted by another thread. Indicates a bug.");
            }
            if (!pickedExecution.get().isPicked()) {
                throw new IllegalStateException("Picked execution does not have expected state in database: " + String.valueOf(pickedExecution.get()));
            }
            return pickedExecution;
        }
        throw new IllegalStateException("Updated multiple rows when picking single execution. Should never happen since name and id is primary key. Execution: " + String.valueOf(e));
    }

    @Override
    public List<Execution> getDeadExecutions(Instant olderThan) {
        UnresolvedFilter unresolvedFilter = new UnresolvedFilter(this.taskResolver.getUnresolved());
        return this.jdbcRunner.query("select * from " + this.tableName + " where picked = ? and last_heartbeat <= ? " + unresolvedFilter.andCondition() + " order by last_heartbeat asc", p -> {
            int index = 1;
            p.setBoolean(index++, true);
            this.jdbcCustomization.setInstant(p, index++, olderThan);
            unresolvedFilter.setParameters(p, index);
        }, new ExecutionResultSetMapper(false, true));
    }

    @Override
    public boolean updateHeartbeatWithRetry(Execution execution, Instant newHeartbeat, int tries) {
        try {
            return this.updateHeartbeat(execution, newHeartbeat);
        }
        catch (RuntimeException e) {
            if (tries <= 1) {
                LOG.warn("Failed to update heartbeat. No more retries.", (Throwable)e);
                return false;
            }
            LOG.info("Failed to update heartbeat. Remaining retries={}.", (Object)(tries - 1), (Object)e);
            return this.updateHeartbeatWithRetry(execution, newHeartbeat, tries - 1);
        }
    }

    @Override
    public boolean updateHeartbeat(Execution e, Instant newHeartbeat) {
        int updated = this.jdbcRunner.execute("update " + this.tableName + " set last_heartbeat = ? where task_name = ? and task_instance = ? and version = ?", ps -> {
            this.jdbcCustomization.setInstant(ps, 1, newHeartbeat);
            ps.setString(2, e.taskInstance.getTaskName());
            ps.setString(3, e.taskInstance.getId());
            ps.setLong(4, e.version);
        });
        if (updated == 0) {
            LOG.debug("Did not update heartbeat. Execution must have been removed or rescheduled(i.e. CompletionHandler ran and finished just before heartbeat-update). This is a race-condition that may occur, but is very unlikely. task-instance={}", (Object)e.taskInstance);
            return false;
        }
        if (updated > 1) {
            LOG.error("Updated multiple rows updating heartbeat for execution. Should never happen since name and id is primary key. Execution: {}", (Object)e);
            return true;
        }
        LOG.debug("Updated heartbeat for execution: {}", (Object)e);
        return true;
    }

    @Override
    public List<Execution> getExecutionsFailingLongerThan(Duration interval) {
        UnresolvedFilter unresolvedFilter = new UnresolvedFilter(this.taskResolver.getUnresolved());
        return this.jdbcRunner.query("select * from " + this.tableName + " where     consecutive_failures > 0     and (last_success is null or last_success < ?) " + unresolvedFilter.andCondition(), p -> {
            int index = 1;
            this.jdbcCustomization.setInstant(p, index++, this.clock.now().minus(interval));
            unresolvedFilter.setParameters(p, index);
        }, new ExecutionResultSetMapper(false, false));
    }

    public Optional<Execution> getExecution(TaskInstance taskInstance) {
        return this.getExecution(taskInstance.getTaskName(), taskInstance.getId());
    }

    @Override
    public Optional<Execution> getExecution(String taskName, String taskInstanceId) {
        List<Execution> executions = this.jdbcRunner.query("select * from " + this.tableName + " where task_name = ? and task_instance = ?", p -> {
            p.setString(1, taskName);
            p.setString(2, taskInstanceId);
        }, new ExecutionResultSetMapper(true, false));
        if (executions.size() > 1) {
            throw new TaskInstanceException("Found more than one matching execution for task name/id combination.", taskName, taskInstanceId);
        }
        return executions.size() == 1 ? Optional.ofNullable(executions.get(0)) : Optional.empty();
    }

    @Override
    public int removeExecutions(String taskName) {
        return this.jdbcRunner.execute("delete from " + this.tableName + " where task_name = ?", p -> p.setString(1, taskName));
    }

    @Override
    public void verifySupportsLockAndFetch() {
        if (!this.jdbcCustomization.supportsSingleStatementLockAndFetch() && !this.jdbcCustomization.supportsGenericLockAndFetch()) {
            throw new IllegalArgumentException("Database using jdbc-customization '" + this.jdbcCustomization.getName() + "' does not support lock-and-fetch polling (i.e. Select-for-update)");
        }
    }

    private JdbcTaskRepositoryContext getTaskRespositoryContext() {
        return new JdbcTaskRepositoryContext(this.taskResolver, this.tableName, this.schedulerSchedulerName, this.jdbcRunner, () -> new ExecutionResultSetMapper(false, true));
    }

    private QueryBuilder queryForFilter(ScheduledExecutionsFilter filter, boolean orderByPriority) {
        QueryBuilder q = QueryBuilder.selectFromTable(this.tableName);
        filter.getPickedValue().ifPresent(value -> q.andCondition(new PickedCondition((boolean)value)));
        q.orderBy(orderByPriority ? "priority desc, execution_time asc" : "execution_time asc");
        return q;
    }

    private static <T> Supplier<T> memoize(final Supplier<T> original) {
        return new Supplier<T>(){
            boolean initialized;
            Supplier<T> delegate = this::firstTime;

            @Override
            public T get() {
                return this.delegate.get();
            }

            private synchronized T firstTime() {
                if (!this.initialized) {
                    Object value = original.get();
                    this.delegate = () -> value;
                    this.initialized = true;
                }
                return this.delegate.get();
            }
        };
    }

    static class UnresolvedFilter
    implements AndCondition {
        private final List<TaskResolver.UnresolvedTask> unresolved;

        public UnresolvedFilter(List<TaskResolver.UnresolvedTask> unresolved) {
            this.unresolved = unresolved;
        }

        public boolean isActive() {
            return !this.unresolved.isEmpty();
        }

        public String andCondition() {
            return this.unresolved.isEmpty() ? "" : "and " + this.getQueryPart();
        }

        @Override
        public String getQueryPart() {
            return "task_name not in (" + this.unresolved.stream().map(ignored -> "?").collect(Collectors.joining(",")) + ")";
        }

        @Override
        public int setParameters(PreparedStatement p, int index) throws SQLException {
            List unresolvedTasknames = this.unresolved.stream().map(TaskResolver.UnresolvedTask::getTaskName).collect(Collectors.toList());
            for (String taskName : unresolvedTasknames) {
                p.setString(index++, taskName);
            }
            return index;
        }
    }

    private class ExecutionResultSetConsumer
    implements ResultSetMapper<Void> {
        private final Consumer<Execution> consumer;
        private final boolean includeUnresolved;
        private final boolean addUnresolvedToExclusionFilter;

        private ExecutionResultSetConsumer(Consumer<Execution> consumer) {
            this(consumer, false, true);
        }

        private ExecutionResultSetConsumer(Consumer<Execution> consumer, boolean includeUnresolved, boolean addUnresolvedToExclusionFilter) {
            this.consumer = consumer;
            this.includeUnresolved = includeUnresolved;
            this.addUnresolvedToExclusionFilter = addUnresolvedToExclusionFilter;
        }

        @Override
        public Void map(ResultSet rs) throws SQLException {
            while (rs.next()) {
                String taskName = rs.getString("task_name");
                Optional<Task> task = JdbcTaskRepository.this.taskResolver.resolve(taskName, this.addUnresolvedToExclusionFilter);
                if (task.isEmpty() && !this.includeUnresolved) {
                    if (!this.addUnresolvedToExclusionFilter) continue;
                    LOG.warn("Failed to find implementation for task with name '{}'. Execution will be excluded from due. The scheduler normally delete unresolved tasks after 14d. To handle manually, either delete the execution from the database, or add an implementation for it. ", (Object)taskName);
                    continue;
                }
                String instanceId = rs.getString("task_instance");
                byte[] data = JdbcTaskRepository.this.jdbcCustomization.getTaskData(rs, "task_data");
                Instant executionTime = JdbcTaskRepository.this.jdbcCustomization.getInstant(rs, "execution_time");
                boolean picked = rs.getBoolean("picked");
                String pickedBy = rs.getString("picked_by");
                Instant lastSuccess = JdbcTaskRepository.this.jdbcCustomization.getInstant(rs, "last_success");
                Instant lastFailure = JdbcTaskRepository.this.jdbcCustomization.getInstant(rs, "last_failure");
                int consecutiveFailures = rs.getInt("consecutive_failures");
                Instant lastHeartbeat = JdbcTaskRepository.this.jdbcCustomization.getInstant(rs, "last_heartbeat");
                long version = rs.getLong("version");
                int priority = JdbcTaskRepository.this.orderByPriority ? rs.getInt("priority") : 0;
                Supplier<Object> dataSupplier = JdbcTaskRepository.memoize(() -> {
                    if (task.isEmpty()) {
                        return data;
                    }
                    return JdbcTaskRepository.this.serializer.deserialize(((Task)task.get()).getDataClass(), data);
                });
                this.consumer.accept(new Execution(executionTime, new TaskInstance<Object>(taskName, instanceId, dataSupplier, priority), picked, pickedBy, lastSuccess, lastFailure, consecutiveFailures, lastHeartbeat, version));
            }
            return null;
        }
    }

    private static class TaskCondition
    implements AndCondition {
        private final String value;

        public TaskCondition(String value) {
            this.value = value;
        }

        @Override
        public String getQueryPart() {
            return "task_name = ?";
        }

        @Override
        public int setParameters(PreparedStatement p, int index) throws SQLException {
            p.setString(index++, this.value);
            return index;
        }
    }

    private class ExecutionResultSetMapper
    implements ResultSetMapper<List<Execution>> {
        private final ArrayList<Execution> executions = new ArrayList();
        private final ExecutionResultSetConsumer delegate;

        private ExecutionResultSetMapper(boolean includeUnresolved, boolean addUnresolvedToExclusionFilter) {
            this.delegate = new ExecutionResultSetConsumer(this.executions::add, includeUnresolved, addUnresolvedToExclusionFilter);
        }

        @Override
        public List<Execution> map(ResultSet resultSet) throws SQLException {
            this.delegate.map(resultSet);
            return this.executions;
        }
    }

    private static class NewData {
        private final Object data;

        NewData(Object data) {
            this.data = data;
        }
    }

    private static class PickedCondition
    implements AndCondition {
        private final boolean value;

        public PickedCondition(boolean value) {
            this.value = value;
        }

        @Override
        public String getQueryPart() {
            return "picked = ?";
        }

        @Override
        public int setParameters(PreparedStatement p, int index) throws SQLException {
            p.setBoolean(index++, this.value);
            return index;
        }
    }
}

