/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.impl.scheduler;

import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.neo4j.kernel.impl.scheduler.ThreadPoolManager;
import org.neo4j.kernel.impl.scheduler.TimeBasedTaskScheduler;
import org.neo4j.scheduler.CancelListener;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobHandle;
import org.neo4j.time.FakeClock;
import org.neo4j.time.SystemNanoClock;
import org.neo4j.util.concurrent.BinaryLatch;

class TimeBasedTaskSchedulerTest {
    private FakeClock clock;
    private ThreadPoolManager pools;
    private TimeBasedTaskScheduler scheduler;
    private AtomicInteger counter;
    private Semaphore semaphore;

    TimeBasedTaskSchedulerTest() {
    }

    @BeforeEach
    void setUp() {
        this.clock = new FakeClock();
        this.pools = new ThreadPoolManager(new ThreadGroup("TestPool"));
        this.scheduler = new TimeBasedTaskScheduler((SystemNanoClock)this.clock, this.pools);
        this.counter = new AtomicInteger();
        this.semaphore = new Semaphore(0);
    }

    @AfterEach
    void tearDown() {
        InterruptedException exception = this.pools.shutDownAll();
        if (exception != null) {
            throw new RuntimeException("Test was interrupted?", exception);
        }
    }

    private void assertSemaphoreAcquire() throws InterruptedException {
        long timeoutMillis = TimeUnit.SECONDS.toMillis(10L);
        long sleepIntervalMillis = 10L;
        long iterations = timeoutMillis / sleepIntervalMillis;
        int i = 0;
        while ((long)i < iterations) {
            if (this.semaphore.tryAcquire(sleepIntervalMillis, TimeUnit.MILLISECONDS)) {
                return;
            }
            this.scheduler.tick();
            ++i;
        }
        org.junit.jupiter.api.Assertions.fail((String)"Semaphore acquire timeout");
    }

    @Test
    void mustDelayExecution() throws Exception {
        JobHandle handle = this.scheduler.submit(Group.STORAGE_MAINTENANCE, this.counter::incrementAndGet, 100L, 0L);
        this.scheduler.tick();
        Assertions.assertThat((int)this.counter.get()).isEqualTo(0);
        this.clock.forward(99L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        Assertions.assertThat((int)this.counter.get()).isEqualTo(0);
        this.clock.forward(1L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        handle.waitTermination();
        Assertions.assertThat((int)this.counter.get()).isEqualTo(1);
    }

    @Test
    void mustOnlyScheduleTasksThatAreDue() throws Exception {
        JobHandle handle1 = this.scheduler.submit(Group.STORAGE_MAINTENANCE, () -> this.counter.addAndGet(10), 100L, 0L);
        JobHandle handle2 = this.scheduler.submit(Group.STORAGE_MAINTENANCE, () -> this.counter.addAndGet(100), 200L, 0L);
        this.scheduler.tick();
        Assertions.assertThat((int)this.counter.get()).isEqualTo(0);
        this.clock.forward(199L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        handle1.waitTermination();
        Assertions.assertThat((int)this.counter.get()).isEqualTo(10);
        this.clock.forward(1L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        handle2.waitTermination();
        Assertions.assertThat((int)this.counter.get()).isEqualTo(110);
    }

    @Test
    void mustNotRescheduleDelayedTasks() throws Exception {
        JobHandle handle = this.scheduler.submit(Group.STORAGE_MAINTENANCE, this.counter::incrementAndGet, 100L, 0L);
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        handle.waitTermination();
        Assertions.assertThat((int)this.counter.get()).isEqualTo(1);
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        handle.waitTermination();
        this.pools.getThreadPool(Group.STORAGE_MAINTENANCE).shutDown();
        Assertions.assertThat((int)this.counter.get()).isEqualTo(1);
    }

    @Test
    void mustRescheduleRecurringTasks() throws Exception {
        this.scheduler.submit(Group.STORAGE_MAINTENANCE, this.semaphore::release, 100L, 100L);
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        this.assertSemaphoreAcquire();
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        this.assertSemaphoreAcquire();
    }

    @Test
    void mustRescheduleRecurringTasksThatThrows() throws Exception {
        CountDownLatch executionCountDown = new CountDownLatch(20);
        Runnable runnable = () -> {
            try {
                this.semaphore.release();
                throw new RuntimeException("boom");
            }
            catch (Throwable throwable) {
                executionCountDown.countDown();
                throw throwable;
            }
        };
        JobHandle handle = this.scheduler.submit(Group.STORAGE_MAINTENANCE, runnable, 10L, 10L);
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        this.assertSemaphoreAcquire();
        do {
            this.clock.forward(100L, TimeUnit.NANOSECONDS);
            this.scheduler.tick();
        } while (!executionCountDown.await(1L, TimeUnit.MILLISECONDS));
    }

    @RepeatedTest(value=100)
    void ensureRescheduledThrowingTasksAreRescheduledCorrectly() throws InterruptedException {
        AtomicInteger timesScheduled = new AtomicInteger(0);
        Runnable runnable = () -> {
            timesScheduled.incrementAndGet();
            this.semaphore.release();
            throw new RuntimeException("boom");
        };
        JobHandle handle = this.scheduler.submit(Group.STORAGE_MAINTENANCE, runnable, 10L, 10L);
        this.clock.forward(20L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        this.assertSemaphoreAcquire();
        Assertions.assertThat((int)timesScheduled.get()).isLessThanOrEqualTo(2);
    }

    @Test
    void mustNotStartRecurringTasksWherePriorExecutionHasNotYetFinished() {
        org.junit.jupiter.api.Assertions.assertTimeoutPreemptively((Duration)Duration.ofMinutes(1L), () -> {
            Runnable runnable = () -> {
                this.counter.incrementAndGet();
                this.semaphore.acquireUninterruptibly();
            };
            this.scheduler.submit(Group.STORAGE_MAINTENANCE, runnable, 100L, 100L);
            for (int i = 0; i < 4; ++i) {
                this.scheduler.tick();
                this.clock.forward(100L, TimeUnit.NANOSECONDS);
            }
            while (!this.semaphore.hasQueuedThreads()) {
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
            }
            this.semaphore.release(Integer.MAX_VALUE);
            this.pools.getThreadPool(Group.STORAGE_MAINTENANCE).shutDown();
            Assertions.assertThat((int)this.counter.get()).isEqualTo(1);
        });
    }

    @Test
    void longRunningTasksMustNotDelayExecutionOfOtherTasks() throws Exception {
        BinaryLatch latch = new BinaryLatch();
        Runnable longRunning = () -> ((BinaryLatch)latch).await();
        Runnable shortRunning = this.semaphore::release;
        this.scheduler.submit(Group.STORAGE_MAINTENANCE, longRunning, 100L, 100L);
        this.scheduler.submit(Group.STORAGE_MAINTENANCE, shortRunning, 100L, 100L);
        for (int i = 0; i < 4; ++i) {
            this.clock.forward(100L, TimeUnit.NANOSECONDS);
            this.scheduler.tick();
            this.assertSemaphoreAcquire();
        }
        latch.release();
    }

    @Test
    void delayedTasksMustNotRunIfCancelledFirst() {
        MonitoredCancelListener cancelListener = new MonitoredCancelListener();
        JobHandle handle = this.scheduler.submit(Group.STORAGE_MAINTENANCE, this.counter::incrementAndGet, 100L, 0L);
        handle.registerCancelListener((CancelListener)cancelListener);
        this.clock.forward(90L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        handle.cancel();
        this.clock.forward(10L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        this.pools.getThreadPool(Group.STORAGE_MAINTENANCE).shutDown();
        Assertions.assertThat((int)this.counter.get()).isEqualTo(0);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)cancelListener.isCanceled());
        org.junit.jupiter.api.Assertions.assertThrows(CancellationException.class, () -> ((JobHandle)handle).waitTermination());
    }

    @Test
    void recurringTasksMustStopWhenCancelled() throws InterruptedException {
        MonitoredCancelListener cancelListener = new MonitoredCancelListener();
        Runnable recurring = () -> {
            this.counter.incrementAndGet();
            this.semaphore.release();
        };
        JobHandle handle = this.scheduler.submit(Group.STORAGE_MAINTENANCE, recurring, 100L, 100L);
        handle.registerCancelListener((CancelListener)cancelListener);
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        this.assertSemaphoreAcquire();
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        this.assertSemaphoreAcquire();
        handle.cancel();
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        this.pools.getThreadPool(Group.STORAGE_MAINTENANCE).shutDown();
        Assertions.assertThat((int)this.counter.get()).isEqualTo(2);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)cancelListener.isCanceled());
    }

    @Test
    void cleanupCanceledHandles() {
        Runnable recurring = () -> this.counter.incrementAndGet();
        JobHandle handle = this.scheduler.submit(Group.STORAGE_MAINTENANCE, recurring, 0L, 100L);
        this.scheduler.tick();
        while (this.scheduler.tasksLeft() == 0) {
            Thread.yield();
        }
        Assertions.assertThat((int)this.counter.get()).isEqualTo(1);
        handle.cancel();
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)this.scheduler.tasksLeft());
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)this.scheduler.tasksLeft());
        this.pools.getThreadPool(Group.STORAGE_MAINTENANCE).shutDown();
        Assertions.assertThat((int)this.counter.get()).isEqualTo(1);
    }

    @Test
    void overdueRecurringTasksMustStartAsSoonAsPossible() {
        Runnable recurring = () -> {
            this.counter.incrementAndGet();
            this.semaphore.acquireUninterruptibly();
        };
        JobHandle handle = this.scheduler.submit(Group.STORAGE_MAINTENANCE, recurring, 100L, 100L);
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        while (this.counter.get() < 1) {
            Thread.yield();
        }
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.scheduler.tick();
        this.clock.forward(100L, TimeUnit.NANOSECONDS);
        this.semaphore.release();
        this.scheduler.tick();
        long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
        while (this.counter.get() < 2 && System.nanoTime() < deadline) {
            this.scheduler.tick();
            Thread.yield();
        }
        Assertions.assertThat((int)this.counter.get()).isEqualTo(2);
        this.semaphore.release(Integer.MAX_VALUE);
        handle.cancel();
    }

    private static class MonitoredCancelListener
    implements CancelListener {
        private boolean canceled;

        private MonitoredCancelListener() {
        }

        public void cancelled() {
            this.canceled = true;
        }

        boolean isCanceled() {
            return this.canceled;
        }
    }
}

