/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source.datagen;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.StatefulSequenceSourceTest;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.junit.Assert;
import org.junit.Test;

public class DataGeneratorSourceTest {
    @Test
    public void testRandomGenerator() throws Exception {
        long min = 10L;
        long max = 20L;
        final DataGeneratorSource source = new DataGeneratorSource((DataGenerator)RandomGenerator.longGenerator((long)min, (long)max));
        StreamSource src = new StreamSource((SourceFunction)source);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness(src, 1, 1, 0);
        testHarness.open();
        final int totalNumber = 1000;
        final ArrayList results = new ArrayList();
        source.run((SourceFunction.SourceContext)new SourceFunction.SourceContext<Long>(){
            private Object lock = new Object();
            private int emitNumber = 0;

            public void collect(Long element) {
                if (++this.emitNumber > totalNumber) {
                    source.isRunning = false;
                }
                results.add(element);
            }

            public void collectWithTimestamp(Long element, long timestamp) {
            }

            public void emitWatermark(Watermark mark) {
            }

            public void markAsTemporarilyIdle() {
            }

            public Object getCheckpointLock() {
                return this.lock;
            }

            public void close() {
            }
        });
        for (Long l : results) {
            Assert.assertTrue((l >= min && l <= max ? 1 : 0) != 0);
        }
    }

    @Test
    public void testSequenceCheckpointRestore() throws Exception {
        boolean initElement = false;
        int maxElement = 100;
        HashSet<Long> expectedOutput = new HashSet<Long>();
        for (long i = 0L; i <= 100L; ++i) {
            expectedOutput.add(i);
        }
        DataGeneratorSourceTest.innerTestDataGenCheckpointRestore(() -> new DataGeneratorSource((DataGenerator)SequenceGenerator.longGenerator((long)0L, (long)100L)), expectedOutput);
    }

    public static <T> void innerTestDataGenCheckpointRestore(Supplier<DataGeneratorSource<T>> supplier, Set<T> expectedOutput) throws Exception {
        int maxParallelsim = 2;
        ConcurrentHashMap outputCollector = new ConcurrentHashMap();
        OneShotLatch latchToTrigger1 = new OneShotLatch();
        OneShotLatch latchToWait1 = new OneShotLatch();
        OneShotLatch latchToTrigger2 = new OneShotLatch();
        OneShotLatch latchToWait2 = new OneShotLatch();
        DataGeneratorSource source1 = supplier.get();
        StreamSource src1 = new StreamSource(source1);
        AbstractStreamOperatorTestHarness testHarness1 = new AbstractStreamOperatorTestHarness(src1, 2, 2, 0);
        testHarness1.open();
        DataGeneratorSource source2 = supplier.get();
        StreamSource src2 = new StreamSource(source2);
        AbstractStreamOperatorTestHarness testHarness2 = new AbstractStreamOperatorTestHarness(src2, 2, 2, 1);
        testHarness2.open();
        Thread runner1 = new Thread(() -> {
            try {
                source1.run(new StatefulSequenceSourceTest.BlockingSourceContext("1", latchToTrigger1, latchToWait1, outputCollector, 21));
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        });
        Thread runner2 = new Thread(() -> {
            try {
                source2.run(new StatefulSequenceSourceTest.BlockingSourceContext("2", latchToTrigger2, latchToWait2, outputCollector, 32));
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        });
        runner1.start();
        runner2.start();
        if (!latchToTrigger1.isTriggered()) {
            latchToTrigger1.await();
        }
        if (!latchToTrigger2.isTriggered()) {
            latchToTrigger2.await();
        }
        OperatorSubtaskState snapshot = AbstractStreamOperatorTestHarness.repackageState(testHarness1.snapshot(0L, 0L), testHarness2.snapshot(0L, 0L));
        DataGeneratorSource source3 = supplier.get();
        StreamSource src3 = new StreamSource(source3);
        OperatorSubtaskState initState = AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 2, 2, 1, 0);
        AbstractStreamOperatorTestHarness testHarness3 = new AbstractStreamOperatorTestHarness(src3, 2, 1, 0);
        testHarness3.setup();
        testHarness3.initializeState(initState);
        testHarness3.open();
        OneShotLatch latchToTrigger3 = new OneShotLatch();
        OneShotLatch latchToWait3 = new OneShotLatch();
        latchToWait3.trigger();
        Thread runner3 = new Thread(() -> {
            try {
                source3.run(new StatefulSequenceSourceTest.BlockingSourceContext("3", latchToTrigger3, latchToWait3, outputCollector, 3));
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        });
        runner3.start();
        runner3.join();
        Assert.assertEquals((long)3L, (long)outputCollector.size());
        HashSet dedupRes = new HashSet(expectedOutput.size());
        for (Map.Entry elementsPerTask : outputCollector.entrySet()) {
            String key = (String)elementsPerTask.getKey();
            List elements = (List)outputCollector.get(key);
            Assert.assertTrue((elements.size() > 0 ? 1 : 0) != 0);
            for (Object elem : elements) {
                if (!dedupRes.add(elem)) {
                    Assert.fail((String)("Duplicate entry: " + elem));
                }
                if (expectedOutput.contains(elem)) continue;
                Assert.fail((String)("Unexpected element: " + elem));
            }
        }
        Assert.assertEquals((long)expectedOutput.size(), (long)dedupRes.size());
        latchToWait1.trigger();
        latchToWait2.trigger();
        runner1.join();
        runner2.join();
    }
}

