package org.apache.flink.runtime.metrics;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.metrics.ThresholdMeter;
import org.apache.flink.util.clock.ManualClock;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/metrics/ThresholdMeterTest.class */
class ThresholdMeterTest {
    private static final double THRESHOLD_LARGE = 1000.0d;
    private static final double THRESHOLD_SMALL = 5.0d;
    private static final Duration INTERVAL = Duration.ofMillis(50);
    private static final long SLEEP = 10;
    private static final double ERROR = 1.0E-6d;
    private static ManualClock clock;

    ThresholdMeterTest() {
    }

    @BeforeEach
    void setup() {
        clock = new ManualClock(42000000L);
    }

    @Test
    void testMarkEvent() {
        ThresholdMeter createLargeThresholdMeter = createLargeThresholdMeter();
        createLargeThresholdMeter.markEvent();
        clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
        Assertions.assertThat(createLargeThresholdMeter.getCount()).isOne();
        Assertions.assertThat(createLargeThresholdMeter.getRate()).isCloseTo(toPerSecondRate(1), Assertions.within(Double.valueOf(ERROR)));
        createLargeThresholdMeter.markEvent();
        Assertions.assertThat(createLargeThresholdMeter.getCount()).isEqualTo(2L);
        clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
        Assertions.assertThat(createLargeThresholdMeter.getRate()).isCloseTo(toPerSecondRate(2), Assertions.within(Double.valueOf(ERROR)));
    }

    @Test
    void testMarkMultipleEvents() {
        ThresholdMeter createLargeThresholdMeter = createLargeThresholdMeter();
        createLargeThresholdMeter.markEvent(2L);
        clock.advanceTime(20L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(createLargeThresholdMeter.getCount()).isEqualTo(2L);
        Assertions.assertThat(createLargeThresholdMeter.getRate()).isCloseTo(toPerSecondRate(2), Assertions.within(Double.valueOf(ERROR)));
    }

    @Test
    void testCheckAgainstThresholdNotExceeded() {
        ThresholdMeter createSmallThresholdMeter = createSmallThresholdMeter();
        for (int i = 0; i < 4.0d; i++) {
            createSmallThresholdMeter.markEvent();
            clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
            createSmallThresholdMeter.checkAgainstThreshold();
        }
    }

    @Test
    void testCheckAgainstThreshold() {
        ThresholdMeter createSmallThresholdMeter = createSmallThresholdMeter();
        for (int i = 0; i < 4.0d; i++) {
            createSmallThresholdMeter.markEvent();
            clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
            createSmallThresholdMeter.checkAgainstThreshold();
        }
        createSmallThresholdMeter.markEvent();
        Assertions.assertThatThrownBy(() -> {
            createSmallThresholdMeter.checkAgainstThreshold();
        }).isInstanceOf(ThresholdMeter.ThresholdExceedException.class);
    }

    @Test
    void testUpdateInterval() {
        ThresholdMeter createSmallThresholdMeter = createSmallThresholdMeter();
        createSmallThresholdMeter.markEvent();
        clock.advanceTime(INTERVAL.toMillis() * 2, TimeUnit.MILLISECONDS);
        for (int i = 0; i < 4.0d; i++) {
            createSmallThresholdMeter.markEvent();
        }
        Assertions.assertThat(createSmallThresholdMeter.getCount()).isEqualTo(5L);
        Assertions.assertThat(createSmallThresholdMeter.getRate()).isCloseTo(toPerSecondRate(4), Assertions.within(Double.valueOf(ERROR)));
        createSmallThresholdMeter.checkAgainstThreshold();
    }

    @Test
    void testConcurrentAccess() throws Exception {
        ThresholdMeter thresholdMeter = new ThresholdMeter(THRESHOLD_LARGE, INTERVAL);
        ArrayList arrayList = new ArrayList();
        thresholdMeter.getClass();
        arrayList.addAll(getConcurrentThreads(repeat(thresholdMeter::markEvent, 100), 2));
        thresholdMeter.getClass();
        arrayList.addAll(getConcurrentThreads(repeat(thresholdMeter::getRate, 100), 2));
        thresholdMeter.getClass();
        arrayList.addAll(getConcurrentThreads(repeat(thresholdMeter::checkAgainstThreshold, 100), 2));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).start();
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((Thread) it2.next()).join();
        }
        Assertions.assertThat(thresholdMeter.getCount()).isEqualTo(200L);
    }

    private static Runnable repeat(Runnable runnable, int i) {
        return () -> {
            for (int i2 = 0; i2 < i; i2++) {
                runnable.run();
            }
        };
    }

    private static List<Thread> getConcurrentThreads(Runnable runnable, int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new Thread(runnable));
        }
        return arrayList;
    }

    private static ThresholdMeter createLargeThresholdMeter() {
        return new ThresholdMeter(THRESHOLD_LARGE, INTERVAL, clock);
    }

    private static ThresholdMeter createSmallThresholdMeter() {
        return new ThresholdMeter(THRESHOLD_SMALL, INTERVAL, clock);
    }

    private static double toPerSecondRate(int i) {
        return (i * THRESHOLD_LARGE) / INTERVAL.toMillis();
    }
}
