/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest;
import org.apache.flink.runtime.scheduler.adaptive.MockStateWithoutExecutionGraphContext;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateValidator;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResources;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AtomicBooleanAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WaitingForResourcesTest {
    private static final Logger LOG = LoggerFactory.getLogger(WaitingForResourcesTest.class);
    private static final Duration STABILIZATION_TIMEOUT = Duration.ofSeconds(1L);
    @RegisterExtension
    private MockContext ctx = new MockContext();

    WaitingForResourcesTest() {
    }

    @Test
    void testTransitionToCreatingExecutionGraph() {
        this.ctx.setHasDesiredResources(() -> true);
        this.ctx.setExpectCreatingExecutionGraph();
        new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT);
        this.ctx.runScheduledTasks();
    }

    @Test
    void testNotEnoughResources() {
        this.ctx.setHasDesiredResources(() -> false);
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT);
        wfr.onNewResourcesAvailable();
    }

    @Test
    void testNotifyNewResourcesAvailable() {
        this.ctx.setHasDesiredResources(() -> false);
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT);
        this.ctx.setHasDesiredResources(() -> true);
        this.ctx.setExpectCreatingExecutionGraph();
        wfr.onNewResourcesAvailable();
    }

    @Test
    void testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() {
        Duration noStabilizationTimeout = Duration.ofMillis(0L);
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, Duration.ofSeconds(1000L), noStabilizationTimeout);
        this.ctx.setHasDesiredResources(() -> false);
        this.ctx.setHasSufficientResources(() -> true);
        this.ctx.setExpectCreatingExecutionGraph();
        wfr.onNewResourcesAvailable();
    }

    @Test
    void testNoSchedulingIfStabilizationTimeoutIsConfigured() {
        Duration stabilizationTimeout = Duration.ofMillis(50000L);
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, Duration.ofSeconds(1000L), stabilizationTimeout);
        this.ctx.setHasDesiredResources(() -> false);
        this.ctx.setHasSufficientResources(() -> true);
        wfr.onNewResourcesAvailable();
        Assertions.assertThat((boolean)this.ctx.hasStateTransition()).isFalse();
    }

    @Test
    void testSchedulingWithSufficientResourcesAfterStabilizationTimeout() {
        Duration initialResourceTimeout = Duration.ofMillis(-1L);
        Duration stabilizationTimeout = Duration.ofMillis(50000L);
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, initialResourceTimeout, stabilizationTimeout, this.ctx.getClock(), null);
        this.ctx.setHasDesiredResources(() -> false);
        this.ctx.setHasSufficientResources(() -> true);
        wfr.onNewResourcesAvailable();
        this.ctx.setExpectCreatingExecutionGraph();
        Duration afterStabilizationTimeout = stabilizationTimeout.plusMillis(1L);
        this.ctx.advanceTimeByMillis(afterStabilizationTimeout.toMillis());
        this.ctx.runScheduledTasks(afterStabilizationTimeout.toMillis());
        Assertions.assertThat((boolean)this.ctx.hasStateTransition()).isTrue();
    }

    @Test
    void testStabilizationTimeoutReset() {
        Duration initialResourceTimeout = Duration.ofMillis(-1L);
        Duration stabilizationTimeout = Duration.ofMillis(50L);
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, initialResourceTimeout, stabilizationTimeout, this.ctx.getClock(), null);
        this.ctx.setHasDesiredResources(() -> false);
        this.ctx.setHasSufficientResources(() -> true);
        this.ctx.advanceTimeByMillis(40L);
        wfr.onNewResourcesAvailable();
        this.ctx.setHasSufficientResources(() -> false);
        this.ctx.advanceTimeByMillis(40L);
        wfr.onNewResourcesAvailable();
        this.ctx.setHasSufficientResources(() -> true);
        this.ctx.advanceTimeByMillis(40L);
        wfr.onNewResourcesAvailable();
        Assertions.assertThat((boolean)this.ctx.hasStateTransition()).isFalse();
        Assertions.assertThat((Duration)this.ctx.getTestDuration()).isGreaterThan((Comparable)stabilizationTimeout);
        this.ctx.setExpectCreatingExecutionGraph();
        this.ctx.advanceTimeByMillis(1L);
        Assertions.assertThat((boolean)this.ctx.hasStateTransition()).isFalse();
        this.ctx.advanceTimeByMillis(stabilizationTimeout.toMillis());
        Assertions.assertThat((boolean)this.ctx.hasStateTransition()).isTrue();
    }

    @Test
    void testNoStateTransitionOnNoResourceTimeout() {
        this.ctx.setHasDesiredResources(() -> false);
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, Duration.ofMillis(-1L), STABILIZATION_TIMEOUT);
        this.ctx.runScheduledTasks();
        Assertions.assertThat((boolean)this.ctx.hasStateTransition()).isFalse();
    }

    @Test
    void testStateTransitionOnResourceTimeout() {
        this.ctx.setHasDesiredResources(() -> false);
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, Duration.ZERO, STABILIZATION_TIMEOUT);
        this.ctx.setExpectCreatingExecutionGraph();
        this.ctx.runScheduledTasks();
    }

    @Test
    void testInternalRunScheduledTasks_correctExecutionOrder() {
        AtomicBoolean firstRun = new AtomicBoolean(false);
        AtomicBoolean secondRun = new AtomicBoolean(false);
        AtomicBoolean thirdRun = new AtomicBoolean(false);
        Runnable runFirstBecauseOfLowDelay = () -> firstRun.set(true);
        Runnable runSecondBecauseOfScheduleOrder = () -> {
            ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)firstRun).as("order violated", new Object[0])).isTrue();
            secondRun.set(true);
        };
        Runnable runLastBecauseOfHighDelay = () -> {
            ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)secondRun).as("order violated", new Object[0])).isTrue();
            thirdRun.set(true);
        };
        this.ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), runLastBecauseOfHighDelay, Duration.ofMillis(999L));
        this.ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), runFirstBecauseOfLowDelay, Duration.ZERO);
        this.ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), runSecondBecauseOfScheduleOrder, Duration.ZERO);
        this.ctx.runScheduledTasks();
        Assertions.assertThat((AtomicBoolean)thirdRun).isTrue();
    }

    @Test
    void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() {
        AtomicBoolean executed = new AtomicBoolean(false);
        Runnable executeOnce = () -> {
            ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)executed).as("Multiple executions", new Object[0])).isFalse();
            executed.set(true);
        };
        this.ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), executeOnce, Duration.ZERO);
        this.ctx.runScheduledTasks();
        this.ctx.runScheduledTasks();
        Assertions.assertThat((AtomicBoolean)executed).isTrue();
    }

    @Test
    void testInternalRunScheduledTasks_upperBoundRespected() {
        Runnable executeNever = () -> Assertions.fail((String)"Not expected");
        this.ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), executeNever, Duration.ofMillis(10L));
        this.ctx.runScheduledTasks(4L);
    }

    @Test
    void testInternalRunScheduledTasks_scheduleTaskFromRunnable() {
        AdaptiveSchedulerTest.DummyState state = new AdaptiveSchedulerTest.DummyState();
        AtomicBoolean executed = new AtomicBoolean(false);
        this.ctx.runIfState(state, () -> this.ctx.runIfState(state, () -> executed.set(true), Duration.ofMillis(4L)), Duration.ZERO);
        this.ctx.runScheduledTasks(10L);
        Assertions.assertThat((AtomicBoolean)executed).isTrue();
    }

    static <T> Consumer<T> assertNonNull() {
        return item -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat((Object)item).isNotNull();
        };
    }

    private static final class ManualTestTime {
        private static final Logger LOG = LoggerFactory.getLogger(ManualTestTime.class);
        private final ManualClock testingClock = new ManualClock();
        private final Consumer<Duration> runOnAdvance;
        private Duration durationSinceTestStart = Duration.ZERO;

        private ManualTestTime(Consumer<Duration> runOnAdvance) {
            this.runOnAdvance = runOnAdvance;
        }

        private Clock getClock() {
            return this.testingClock;
        }

        public void advanceMillis(long millis) {
            this.durationSinceTestStart = this.durationSinceTestStart.plusMillis(millis);
            LOG.info("Advance testing time by {} ms to time {} ms", (Object)millis, (Object)this.durationSinceTestStart.toMillis());
            this.testingClock.advanceTime(millis, TimeUnit.MILLISECONDS);
            this.runOnAdvance.accept(this.durationSinceTestStart);
        }

        public Duration getTestDuration() {
            return this.durationSinceTestStart;
        }
    }

    private static class MockContext
    extends MockStateWithoutExecutionGraphContext
    implements WaitingForResources.Context {
        private static final Logger LOG = LoggerFactory.getLogger(MockContext.class);
        private final StateValidator<Void> creatingExecutionGraphStateValidator = new StateValidator("executing");
        private Supplier<Boolean> hasDesiredResourcesSupplier = () -> false;
        private Supplier<Boolean> hasSufficientResourcesSupplier = () -> false;
        private final Queue<ScheduledTask<Void>> scheduledTasks = new PriorityQueue<ScheduledTask>(Comparator.comparingLong(o -> o.getDelay(TimeUnit.MILLISECONDS)));
        private final ManualTestTime testTime = new ManualTestTime(durationSinceTestStart -> this.runScheduledTasks(durationSinceTestStart.toMillis()));

        private MockContext() {
        }

        public void setHasDesiredResources(Supplier<Boolean> sup) {
            this.hasDesiredResourcesSupplier = sup;
        }

        public void setHasSufficientResources(Supplier<Boolean> sup) {
            this.hasSufficientResourcesSupplier = sup;
        }

        void setExpectCreatingExecutionGraph() {
            this.creatingExecutionGraphStateValidator.expectInput(none -> {});
        }

        void runScheduledTasks(long untilDelay) {
            LOG.info("Running scheduled tasks with a delay between 0 and {}ms:", (Object)untilDelay);
            while (this.scheduledTasks.peek() != null && this.scheduledTasks.peek().getDelay(TimeUnit.MILLISECONDS) <= untilDelay) {
                ScheduledTask<Void> scheduledTask = this.scheduledTasks.poll();
                LOG.info("Running task with delay {}", (Object)scheduledTask.getDelay(TimeUnit.MILLISECONDS));
                scheduledTask.execute();
                if (!scheduledTask.isPeriodic()) continue;
                this.scheduledTasks.add(scheduledTask);
            }
        }

        void runScheduledTasks() {
            this.runScheduledTasks(Long.MAX_VALUE);
        }

        @Override
        public void afterEach(ExtensionContext extensionContext) throws Exception {
            super.afterEach(extensionContext);
            this.creatingExecutionGraphStateValidator.close();
        }

        public boolean hasDesiredResources() {
            return this.hasDesiredResourcesSupplier.get();
        }

        public boolean hasSufficientResources() {
            return this.hasSufficientResourcesSupplier.get();
        }

        public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) {
            LOG.info("Scheduling work with delay {} for earliest execution at {}", (Object)delay.toMillis(), (Object)(this.testTime.getClock().absoluteTimeMillis() + delay.toMillis()));
            ScheduledTask scheduledTask = new ScheduledTask(() -> {
                if (!this.hasStateTransition()) {
                    action.run();
                }
                return null;
            }, this.testTime.getClock().absoluteTimeMillis() + delay.toMillis());
            this.scheduledTasks.add((ScheduledTask<Void>)scheduledTask);
            return scheduledTask;
        }

        public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) {
            this.creatingExecutionGraphStateValidator.validateInput(null);
            this.registerStateTransition();
        }

        public Clock getClock() {
            return this.testTime.getClock();
        }

        public void advanceTimeByMillis(long millis) {
            this.testTime.advanceMillis(millis);
        }

        public Duration getTestDuration() {
            return this.testTime.getTestDuration();
        }
    }
}

