/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.source.coordinator.ExecutorNotifier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ExecutorNotifierTest {
    private ScheduledExecutorService workerExecutor;
    private ExecutorService executorToNotify;
    private ExecutorNotifier notifier;
    private Throwable exceptionInHandler;
    private CountDownLatch exceptionInHandlerLatch;

    @Before
    public void setup() {
        this.exceptionInHandler = null;
        this.exceptionInHandlerLatch = new CountDownLatch(1);
        this.workerExecutor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "worker-thread"));
        this.executorToNotify = Executors.newSingleThreadExecutor(r -> {
            Thread t = new Thread(r, "main-thread");
            t.setUncaughtExceptionHandler((thread, e) -> {
                this.exceptionInHandler = e;
                this.exceptionInHandlerLatch.countDown();
            });
            return t;
        });
        this.notifier = new ExecutorNotifier(this.workerExecutor, (Executor)this.executorToNotify);
    }

    @After
    public void tearDown() throws InterruptedException {
        ComponentClosingUtils.shutdownExecutorForcefully((ExecutorService)this.workerExecutor, (Duration)Duration.ofNanos(Long.MAX_VALUE));
        ComponentClosingUtils.shutdownExecutorForcefully((ExecutorService)this.executorToNotify, (Duration)Duration.ofNanos(Long.MAX_VALUE));
    }

    @Test
    public void testBasic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicInteger result = new AtomicInteger(0);
        this.notifier.notifyReadyAsync(() -> 1234, (v, e) -> {
            result.set((int)v);
            latch.countDown();
        });
        latch.await();
        Assert.assertEquals((long)1234L, (long)result.get());
    }

    @Test
    public void testExceptionInCallable() {
        Exception exception = new Exception("Expected exception.");
        this.notifier.notifyReadyAsync(() -> {
            throw exception;
        }, (v, e) -> {
            Assert.assertEquals((Object)exception, (Object)e);
            Assert.assertNull((Object)v);
        });
    }

    @Test
    public void testExceptionInHandlerWhenHandlingException() throws InterruptedException {
        Exception exception1 = new Exception("Expected exception.");
        RuntimeException exception2 = new RuntimeException("Expected exception.");
        CountDownLatch latch = new CountDownLatch(1);
        this.notifier.notifyReadyAsync(() -> {
            throw exception1;
        }, (v, e) -> {
            Assert.assertEquals((Object)exception1, (Object)e);
            Assert.assertNull((Object)v);
            latch.countDown();
            throw exception2;
        });
        latch.await();
        this.exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)exception2, (Object)this.exceptionInHandler);
    }

    @Test
    public void testExceptionInHandlerWhenHandlingResult() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        RuntimeException exception = new RuntimeException("Expected exception.");
        this.notifier.notifyReadyAsync(() -> 1234, (v, e) -> {
            latch.countDown();
            throw exception;
        });
        latch.await();
        this.exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)exception, (Object)this.exceptionInHandler);
    }
}

