/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

class SystemProcessingTimeServiceTest {
    SystemProcessingTimeServiceTest() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Timeout(value=10000L, unit=TimeUnit.MILLISECONDS)
    @Test
    void testScheduleAtFixedRate() throws Exception {
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        long period = 10L;
        int countDown = 3;
        SystemProcessingTimeService timer = SystemProcessingTimeServiceTest.createSystemProcessingTimeService(errorRef);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        try {
            timer.scheduleAtFixedRate(timestamp -> countDownLatch.countDown(), 0L, 10L);
            countDownLatch.await();
            Assertions.assertThat((Throwable)errorRef.get()).isNull();
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testQuiesceAndAwaitingCancelsScheduledAtFixRateFuture() throws Exception {
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        long period = 10L;
        SystemProcessingTimeService timer = SystemProcessingTimeServiceTest.createSystemProcessingTimeService(errorRef);
        try {
            ScheduledFuture scheduledFuture = timer.scheduleAtFixedRate(timestamp -> {}, 0L, 10L);
            Assertions.assertThat((Future)scheduledFuture).isNotDone();
            timer.quiesce().get();
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(scheduledFuture::get).as("scheduled future is not cancelled", new Object[0])).isInstanceOf(CancellationException.class);
            scheduledFuture = timer.scheduleAtFixedRate(timestamp -> {
                throw new Exception("Test exception.");
            }, 0L, 100L);
            Assertions.assertThat((Future)scheduledFuture).isNotNull();
            Assertions.assertThat((int)timer.getNumTasksScheduled()).isZero();
            Assertions.assertThat((Throwable)errorRef.get()).isNull();
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testImmediateShutdown() throws Exception {
        CompletableFuture<Throwable> errorFuture = new CompletableFuture<Throwable>();
        SystemProcessingTimeService timer = SystemProcessingTimeServiceTest.createSystemProcessingTimeService(errorFuture);
        try {
            Assertions.assertThat((boolean)timer.isTerminated()).isFalse();
            OneShotLatch latch = new OneShotLatch();
            timer.registerTimer(System.currentTimeMillis(), timestamp -> {
                latch.trigger();
                Thread.sleep(100000000L);
            });
            latch.await();
            timer.shutdownService();
            Assertions.assertThat((boolean)timer.isTerminated()).isTrue();
            Assertions.assertThat((int)timer.getNumTasksScheduled()).isZero();
            Assertions.assertThatThrownBy(() -> timer.registerTimer(System.currentTimeMillis() + 1000L, timestamp -> Assertions.fail((String)"should not be called"))).isInstanceOf(IllegalStateException.class);
            Assertions.assertThatThrownBy(() -> timer.scheduleAtFixedRate(timestamp -> Assertions.fail((String)"should not be called"), 0L, 100L)).isInstanceOf(IllegalStateException.class);
            Assertions.assertThat((Throwable)errorFuture.get(30L, TimeUnit.SECONDS)).isInstanceOf(InterruptedException.class);
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testQuiescing() throws Exception {
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        SystemProcessingTimeService timer = SystemProcessingTimeServiceTest.createSystemProcessingTimeService(errorRef);
        try {
            OneShotLatch latch = new OneShotLatch();
            ReentrantLock scopeLock = new ReentrantLock();
            timer.registerTimer(timer.getCurrentProcessingTime() + 20L, timestamp -> {
                scopeLock.lock();
                try {
                    latch.trigger();
                    Thread.sleep(5L);
                }
                finally {
                    scopeLock.unlock();
                }
            });
            latch.await();
            timer.quiesce().get();
            Assertions.assertThat((boolean)scopeLock.tryLock()).isTrue();
            ScheduledFuture future = timer.registerTimer(timer.getCurrentProcessingTime() - 5L, timestamp -> {
                throw new Exception("test");
            });
            Assertions.assertThat((Future)future).isNotNull();
            Assertions.assertThat((int)timer.getNumTasksScheduled()).isZero();
            Assertions.assertThat((Throwable)errorRef.get()).isNull();
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testFutureCancellation() {
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        SystemProcessingTimeService timer = SystemProcessingTimeServiceTest.createSystemProcessingTimeService(errorRef);
        try {
            Assertions.assertThat((int)timer.getNumTasksScheduled()).isZero();
            ScheduledFuture future = timer.registerTimer(System.currentTimeMillis() + 100000000L, timestamp -> {});
            Assertions.assertThat((int)timer.getNumTasksScheduled()).isOne();
            future.cancel(false);
            Assertions.assertThat((int)timer.getNumTasksScheduled()).isZero();
            future = timer.scheduleAtFixedRate(timestamp -> {}, 10000000000L, 50L);
            Assertions.assertThat((int)timer.getNumTasksScheduled()).isOne();
            future.cancel(false);
            Assertions.assertThat((int)timer.getNumTasksScheduled()).isZero();
            Assertions.assertThat((Throwable)errorRef.get()).isNull();
        }
        finally {
            timer.shutdownService();
        }
    }

    @Test
    void testShutdownAndWaitPending() throws Exception {
        OneShotLatch blockUntilTriggered = new OneShotLatch();
        AtomicBoolean timerExecutionFinished = new AtomicBoolean(false);
        SystemProcessingTimeService timeService = SystemProcessingTimeServiceTest.createBlockingSystemProcessingTimeService(blockUntilTriggered, timerExecutionFinished);
        Assertions.assertThat((boolean)timeService.isTerminated()).isFalse();
        Assertions.assertThat((boolean)timeService.shutdownAndAwaitPending(1L, TimeUnit.SECONDS)).isFalse();
        blockUntilTriggered.trigger();
        Assertions.assertThat((boolean)timeService.shutdownAndAwaitPending(60L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat((AtomicBoolean)timerExecutionFinished).isTrue();
        Assertions.assertThat((boolean)timeService.isTerminated()).isTrue();
    }

    @Test
    void testShutdownServiceUninterruptible() {
        OneShotLatch blockUntilTriggered = new OneShotLatch();
        AtomicBoolean timerFinished = new AtomicBoolean(false);
        SystemProcessingTimeService timeService = SystemProcessingTimeServiceTest.createBlockingSystemProcessingTimeService(blockUntilTriggered, timerFinished);
        Assertions.assertThat((boolean)timeService.isTerminated()).isFalse();
        Thread interruptTarget = Thread.currentThread();
        AtomicBoolean runInterrupts = new AtomicBoolean(true);
        Thread interruptCallerThread = new Thread(() -> {
            while (runInterrupts.get()) {
                interruptTarget.interrupt();
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException interruptedException) {}
            }
        });
        interruptCallerThread.start();
        long timeoutMs = 50L;
        long startTime = System.nanoTime();
        Assertions.assertThat((boolean)timeService.isTerminated()).isFalse();
        Assertions.assertThat((boolean)timeService.shutdownServiceUninterruptible(50L)).isFalse();
        Assertions.assertThat((boolean)timeService.isTerminated()).isTrue();
        Assertions.assertThat((AtomicBoolean)timerFinished).isFalse();
        Assertions.assertThat((long)(System.nanoTime() - startTime)).isGreaterThanOrEqualTo(50000000L);
        runInterrupts.set(false);
        do {
            try {
                interruptCallerThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        } while (interruptCallerThread.isAlive());
        boolean ignored = Thread.interrupted();
        blockUntilTriggered.trigger();
        Assertions.assertThat((boolean)timeService.shutdownServiceUninterruptible(50L)).isTrue();
        Assertions.assertThat((AtomicBoolean)timerFinished).isTrue();
    }

    private static SystemProcessingTimeService createSystemProcessingTimeService(CompletableFuture<Throwable> errorFuture) {
        Preconditions.checkArgument((!errorFuture.isDone() ? 1 : 0) != 0);
        return new SystemProcessingTimeService(errorFuture::complete);
    }

    private static SystemProcessingTimeService createSystemProcessingTimeService(AtomicReference<Throwable> errorRef) {
        Preconditions.checkArgument((errorRef.get() == null ? 1 : 0) != 0);
        return new SystemProcessingTimeService(ex -> errorRef.compareAndSet(null, ex));
    }

    private static SystemProcessingTimeService createBlockingSystemProcessingTimeService(OneShotLatch blockUntilTriggered, AtomicBoolean check) {
        OneShotLatch waitUntilTimerStarted = new OneShotLatch();
        Preconditions.checkState((!check.get() ? 1 : 0) != 0);
        SystemProcessingTimeService timeService = new SystemProcessingTimeService(exception -> {});
        timeService.scheduleAtFixedRate(timestamp -> {
            waitUntilTimerStarted.trigger();
            boolean unblocked = false;
            while (!unblocked) {
                try {
                    blockUntilTriggered.await();
                    unblocked = true;
                }
                catch (InterruptedException interruptedException) {}
            }
            check.set(true);
        }, 0L, 10L);
        try {
            waitUntilTimerStarted.await();
        }
        catch (InterruptedException e) {
            Assertions.fail((String)"Problem while starting up service.");
        }
        return timeService;
    }
}

