/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.metrics;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.metrics.ThresholdMeter;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ThresholdMeterTest {
    private static final double THRESHOLD_LARGE = 1000.0;
    private static final double THRESHOLD_SMALL = 5.0;
    private static final Duration INTERVAL = Duration.ofMillis(50L);
    private static final long SLEEP = 10L;
    private static final double ERROR = 1.0E-6;
    private static ManualClock clock;

    ThresholdMeterTest() {
    }

    @BeforeEach
    void setup() {
        clock = new ManualClock(42000000L);
    }

    @Test
    void testMarkEvent() {
        ThresholdMeter thresholdMeter = ThresholdMeterTest.createLargeThresholdMeter();
        thresholdMeter.markEvent();
        clock.advanceTime(10L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)thresholdMeter.getCount()).isOne();
        Assertions.assertThat((double)thresholdMeter.getRate()).isCloseTo(ThresholdMeterTest.toPerSecondRate(1), Assertions.within((Double)1.0E-6));
        thresholdMeter.markEvent();
        Assertions.assertThat((long)thresholdMeter.getCount()).isEqualTo(2L);
        clock.advanceTime(10L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((double)thresholdMeter.getRate()).isCloseTo(ThresholdMeterTest.toPerSecondRate(2), Assertions.within((Double)1.0E-6));
    }

    @Test
    void testMarkMultipleEvents() {
        ThresholdMeter thresholdMeter = ThresholdMeterTest.createLargeThresholdMeter();
        thresholdMeter.markEvent(2L);
        clock.advanceTime(20L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)thresholdMeter.getCount()).isEqualTo(2L);
        Assertions.assertThat((double)thresholdMeter.getRate()).isCloseTo(ThresholdMeterTest.toPerSecondRate(2), Assertions.within((Double)1.0E-6));
    }

    @Test
    void testCheckAgainstThresholdNotExceeded() {
        ThresholdMeter thresholdMeter = ThresholdMeterTest.createSmallThresholdMeter();
        int i = 0;
        while ((double)i < 4.0) {
            thresholdMeter.markEvent();
            clock.advanceTime(10L, TimeUnit.MILLISECONDS);
            thresholdMeter.checkAgainstThreshold();
            ++i;
        }
    }

    @Test
    void testCheckAgainstThreshold() {
        ThresholdMeter thresholdMeter = ThresholdMeterTest.createSmallThresholdMeter();
        int i = 0;
        while ((double)i < 4.0) {
            thresholdMeter.markEvent();
            clock.advanceTime(10L, TimeUnit.MILLISECONDS);
            thresholdMeter.checkAgainstThreshold();
            ++i;
        }
        thresholdMeter.markEvent();
        Assertions.assertThatThrownBy(() -> thresholdMeter.checkAgainstThreshold()).isInstanceOf(ThresholdMeter.ThresholdExceedException.class);
    }

    @Test
    void testUpdateInterval() {
        ThresholdMeter thresholdMeter = ThresholdMeterTest.createSmallThresholdMeter();
        thresholdMeter.markEvent();
        clock.advanceTime(INTERVAL.toMillis() * 2L, TimeUnit.MILLISECONDS);
        int i = 0;
        while ((double)i < 4.0) {
            thresholdMeter.markEvent();
            ++i;
        }
        Assertions.assertThat((long)thresholdMeter.getCount()).isEqualTo(5L);
        Assertions.assertThat((double)thresholdMeter.getRate()).isCloseTo(ThresholdMeterTest.toPerSecondRate(4), Assertions.within((Double)1.0E-6));
        thresholdMeter.checkAgainstThreshold();
    }

    @Test
    void testConcurrentAccess() throws Exception {
        ThresholdMeter thresholdMeter = new ThresholdMeter(1000.0, INTERVAL);
        int repeatNum = 100;
        int concurrency = 2;
        ArrayList<Thread> threads = new ArrayList<Thread>();
        threads.addAll(ThresholdMeterTest.getConcurrentThreads(ThresholdMeterTest.repeat(() -> ((ThresholdMeter)thresholdMeter).markEvent(), 100), 2));
        threads.addAll(ThresholdMeterTest.getConcurrentThreads(ThresholdMeterTest.repeat(() -> ((ThresholdMeter)thresholdMeter).getRate(), 100), 2));
        threads.addAll(ThresholdMeterTest.getConcurrentThreads(ThresholdMeterTest.repeat(() -> ((ThresholdMeter)thresholdMeter).checkAgainstThreshold(), 100), 2));
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            thread.join();
        }
        Assertions.assertThat((long)thresholdMeter.getCount()).isEqualTo(200L);
    }

    private static Runnable repeat(Runnable task, int repeatNum) {
        return () -> {
            for (int i = 0; i < repeatNum; ++i) {
                task.run();
            }
        };
    }

    private static List<Thread> getConcurrentThreads(Runnable task, int concurrency) {
        ArrayList<Thread> threads = new ArrayList<Thread>();
        for (int i = 0; i < concurrency; ++i) {
            threads.add(new Thread(task));
        }
        return threads;
    }

    private static ThresholdMeter createLargeThresholdMeter() {
        return new ThresholdMeter(1000.0, INTERVAL, (Clock)clock);
    }

    private static ThresholdMeter createSmallThresholdMeter() {
        return new ThresholdMeter(5.0, INTERVAL, (Clock)clock);
    }

    private static double toPerSecondRate(int eventsPerInterval) {
        return (double)eventsPerInterval * 1000.0 / (double)INTERVAL.toMillis();
    }
}

