/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.metrics;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.metrics.ThresholdMeter;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ThresholdMeterTest
extends TestLogger {
    private static final double THRESHOLD_LARGE = 1000.0;
    private static final double THRESHOLD_SMALL = 5.0;
    private static final Duration INTERVAL = Duration.ofMillis(50L);
    private static final long SLEEP = 10L;
    private static final double ERROR = 1.0E-6;
    private static ManualClock clock;

    @Before
    public void setup() {
        clock = new ManualClock(42000000L);
    }

    @Test
    public void testMarkEvent() {
        ThresholdMeter thresholdMeter = ThresholdMeterTest.createLargeThresholdMeter();
        thresholdMeter.markEvent();
        clock.advanceTime(10L, TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)thresholdMeter.getCount(), (Matcher)Matchers.is((Object)1L));
        Assert.assertThat((Object)thresholdMeter.getRate(), (Matcher)Matchers.closeTo((double)ThresholdMeterTest.toPerSecondRate(1), (double)1.0E-6));
        thresholdMeter.markEvent();
        Assert.assertThat((Object)thresholdMeter.getCount(), (Matcher)Matchers.is((Object)2L));
        clock.advanceTime(10L, TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)thresholdMeter.getRate(), (Matcher)Matchers.closeTo((double)ThresholdMeterTest.toPerSecondRate(2), (double)1.0E-6));
    }

    @Test
    public void testMarkMultipleEvents() {
        ThresholdMeter thresholdMeter = ThresholdMeterTest.createLargeThresholdMeter();
        thresholdMeter.markEvent(2L);
        clock.advanceTime(20L, TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)thresholdMeter.getCount(), (Matcher)Matchers.is((Object)2L));
        Assert.assertThat((Object)thresholdMeter.getRate(), (Matcher)Matchers.closeTo((double)ThresholdMeterTest.toPerSecondRate(2), (double)1.0E-6));
    }

    @Test
    public void testCheckAgainstThresholdNotExceeded() {
        ThresholdMeter thresholdMeter = ThresholdMeterTest.createSmallThresholdMeter();
        int i = 0;
        while ((double)i < 4.0) {
            thresholdMeter.markEvent();
            clock.advanceTime(10L, TimeUnit.MILLISECONDS);
            thresholdMeter.checkAgainstThreshold();
            ++i;
        }
    }

    @Test
    public void testCheckAgainstThreshold() {
        ThresholdMeter thresholdMeter = ThresholdMeterTest.createSmallThresholdMeter();
        int i = 0;
        while ((double)i < 4.0) {
            thresholdMeter.markEvent();
            clock.advanceTime(10L, TimeUnit.MILLISECONDS);
            thresholdMeter.checkAgainstThreshold();
            ++i;
        }
        thresholdMeter.markEvent();
        try {
            thresholdMeter.checkAgainstThreshold();
            Assert.fail();
        }
        catch (ThresholdMeter.ThresholdExceedException thresholdExceedException) {
            // empty catch block
        }
    }

    @Test
    public void testUpdateInterval() {
        ThresholdMeter thresholdMeter = ThresholdMeterTest.createSmallThresholdMeter();
        thresholdMeter.markEvent();
        clock.advanceTime(INTERVAL.toMillis() * 2L, TimeUnit.MILLISECONDS);
        int i = 0;
        while ((double)i < 4.0) {
            thresholdMeter.markEvent();
            ++i;
        }
        Assert.assertThat((Object)thresholdMeter.getCount(), (Matcher)Matchers.is((Object)5L));
        Assert.assertThat((Object)thresholdMeter.getRate(), (Matcher)Matchers.closeTo((double)ThresholdMeterTest.toPerSecondRate(4), (double)1.0E-6));
        thresholdMeter.checkAgainstThreshold();
    }

    @Test
    public void testConcurrentAccess() throws Exception {
        ThresholdMeter thresholdMeter = new ThresholdMeter(1000.0, INTERVAL);
        int repeatNum = 100;
        int concurrency = 2;
        ArrayList<Thread> threads = new ArrayList<Thread>();
        threads.addAll(ThresholdMeterTest.getConcurrentThreads(ThresholdMeterTest.repeat(() -> ((ThresholdMeter)thresholdMeter).markEvent(), 100), 2));
        threads.addAll(ThresholdMeterTest.getConcurrentThreads(ThresholdMeterTest.repeat(() -> ((ThresholdMeter)thresholdMeter).getRate(), 100), 2));
        threads.addAll(ThresholdMeterTest.getConcurrentThreads(ThresholdMeterTest.repeat(() -> ((ThresholdMeter)thresholdMeter).checkAgainstThreshold(), 100), 2));
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            thread.join();
        }
        Assert.assertEquals((long)200L, (long)thresholdMeter.getCount());
    }

    private static Runnable repeat(Runnable task, int repeatNum) {
        return () -> {
            for (int i = 0; i < repeatNum; ++i) {
                task.run();
            }
        };
    }

    private static List<Thread> getConcurrentThreads(Runnable task, int concurrency) {
        ArrayList<Thread> threads = new ArrayList<Thread>();
        for (int i = 0; i < concurrency; ++i) {
            threads.add(new Thread(task));
        }
        return threads;
    }

    private static ThresholdMeter createLargeThresholdMeter() {
        return new ThresholdMeter(1000.0, INTERVAL, (Clock)clock);
    }

    private static ThresholdMeter createSmallThresholdMeter() {
        return new ThresholdMeter(5.0, INTERVAL, (Clock)clock);
    }

    private static double toPerSecondRate(int eventsPerInterval) {
        return (double)eventsPerInterval * 1000.0 / (double)INTERVAL.toMillis();
    }
}

