/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.WorkerExecutor;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.test.core.VertxTestBase;
import io.vertx.tests.deployment.VirtualThreadDeploymentTest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

public class VirtualThreadContextTest
extends VertxTestBase {
    VertxInternal vertx;

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.vertx = (VertxInternal)((VertxTestBase)this).vertx;
    }

    @Test
    public void testContext() {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        this.vertx.createVirtualThreadContext().runOnContext(v -> {
            Thread thread = Thread.currentThread();
            this.assertTrue(VirtualThreadDeploymentTest.isVirtual(thread));
            ContextInternal context = this.vertx.getOrCreateContext();
            Executor executor = context.executor();
            this.assertTrue(executor instanceof WorkerExecutor);
            context.runOnContext(v2 -> {
                this.assertSame(context, this.vertx.getOrCreateContext());
                this.testComplete();
            });
        });
        this.await();
    }

    @Test
    public void testAwaitFutureSuccess() {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        Object result = new Object();
        this.vertx.createVirtualThreadContext().runOnContext(v -> {
            ContextInternal context = this.vertx.getOrCreateContext();
            PromiseInternal promise = context.promise();
            new Thread(() -> {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                promise.complete(result);
            }).start();
            this.assertSame(result, Future.await((Future)promise.future()));
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testAwaitFutureFailure() {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        Exception failure = new Exception();
        this.vertx.createVirtualThreadContext().runOnContext(v -> {
            ContextInternal context = this.vertx.getOrCreateContext();
            PromiseInternal promise = context.promise();
            new Thread(() -> {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                promise.fail((Throwable)failure);
            }).start();
            try {
                Future.await((Future)promise.future());
            }
            catch (Exception e) {
                this.assertSame(failure, e);
                this.testComplete();
                return;
            }
            this.fail();
        });
        this.await();
    }

    @Test
    public void testAwaitCompoundFuture() {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        Object result = new Object();
        this.vertx.createVirtualThreadContext().runOnContext(v -> {
            ContextInternal context = this.vertx.getOrCreateContext();
            PromiseInternal promise = context.promise();
            new Thread(() -> {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                promise.complete(result);
            }).start();
            this.assertSame("HELLO", Future.await((Future)promise.future().map(res -> "HELLO")));
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testDuplicateUseSameThread() {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        int num = 1000;
        this.waitFor(num);
        this.vertx.createVirtualThreadContext().runOnContext(v -> {
            ContextInternal context = this.vertx.getOrCreateContext();
            Thread th = Thread.currentThread();
            for (int i = 0; i < num; ++i) {
                ContextInternal duplicate = context.duplicate();
                duplicate.runOnContext(v2 -> this.complete());
            }
        });
        this.await();
    }

    @Test
    public void testDuplicateConcurrentAwait() {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        int num = 1000;
        this.waitFor(num);
        this.vertx.createVirtualThreadContext().runOnContext(v -> {
            ContextInternal context = this.vertx.getOrCreateContext();
            Object lock = new Object();
            ArrayList list = new ArrayList();
            for (int i = 0; i < num; ++i) {
                ContextInternal duplicate = context.duplicate();
                duplicate.runOnContext(v2 -> {
                    boolean complete;
                    PromiseInternal promise = duplicate.promise();
                    Object object = lock;
                    synchronized (object) {
                        list.add(promise);
                        complete = list.size() == num;
                    }
                    if (complete) {
                        context.runOnContext(v3 -> {
                            Object object = lock;
                            synchronized (object) {
                                list.forEach(p -> p.complete(null));
                            }
                        });
                    }
                    Future f = promise.future();
                    Future.await((Future)f);
                    this.complete();
                });
            }
        });
        this.await();
    }

    @Test
    public void testTimer() {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        this.vertx.createVirtualThreadContext().runOnContext(v -> {
            ContextInternal context = this.vertx.getOrCreateContext();
            PromiseInternal promise = context.promise();
            this.vertx.setTimer(100L, id -> promise.complete((Object)"foo"));
            String res = (String)Future.await((Future)promise);
            this.assertEquals("foo", res);
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testInThread() {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        this.vertx.createVirtualThreadContext().runOnContext(v1 -> {
            ContextInternal context = this.vertx.getOrCreateContext();
            this.assertTrue(context.inThread());
            new Thread(() -> {
                boolean wasNotInThread = !context.inThread();
                context.runOnContext(v2 -> {
                    this.assertTrue(wasNotInThread);
                    this.assertTrue(context.inThread());
                    this.testComplete();
                });
            }).start();
        });
        this.await();
    }

    private void sleep(AtomicInteger inflight) {
        this.assertEquals(0L, inflight.getAndIncrement());
        try {
            Thread.sleep(100L);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        finally {
            inflight.decrementAndGet();
        }
    }

    @Test
    public void testSerializeBlocking() throws Exception {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        AtomicInteger inflight = new AtomicInteger();
        this.vertx.createVirtualThreadContext().runOnContext(v1 -> {
            ContextInternal ctx = this.vertx.getOrCreateContext();
            for (int i = 0; i < 10; ++i) {
                ctx.runOnContext(v2 -> this.sleep(inflight));
            }
            ctx.runOnContext(v -> this.testComplete());
        });
        this.await();
    }

    @Test
    public void testVirtualThreadsNotAvailable() {
        Assume.assumeFalse((boolean)this.isVirtualThreadAvailable());
        try {
            this.vertx.createVirtualThreadContext();
            this.fail();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testVirtualThreadInterruptOnClose() throws Exception {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        ContextInternal ctx = this.vertx.createVirtualThreadContext();
        ctx.exceptionHandler(err -> {});
        PromiseInternal promise = ctx.promise();
        AtomicReference ref = new AtomicReference();
        AtomicBoolean interrupted = new AtomicBoolean();
        ctx.runOnContext(arg_0 -> this.lambda$testVirtualThreadInterruptOnClose$24(ref, (Promise)promise, interrupted, arg_0));
        VirtualThreadContextTest.assertWaitUntil(() -> ref.get() != null && ((Thread)ref.get()).getState() == Thread.State.WAITING);
        ctx.close().toCompletionStage().toCompletableFuture().get(20L, TimeUnit.SECONDS);
        VirtualThreadContextTest.assertWaitUntil(interrupted::get);
    }

    @Test
    public void testVirtualThreadInterruptOnClose2() throws Exception {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        ContextInternal ctx = this.vertx.createVirtualThreadContext();
        AtomicReference ref = new AtomicReference();
        AtomicBoolean interrupted = new AtomicBoolean();
        CountDownLatch latch = new CountDownLatch(1);
        ctx.runOnContext(v -> {
            try {
                ref.set(Thread.currentThread());
                latch.await();
                this.fail();
            }
            catch (InterruptedException e) {
                interrupted.set(true);
            }
        });
        VirtualThreadContextTest.assertWaitUntil(() -> ref.get() != null && ((Thread)ref.get()).getState() == Thread.State.WAITING);
        ctx.close().toCompletionStage().toCompletableFuture().get(20L, TimeUnit.SECONDS);
        VirtualThreadContextTest.assertWaitUntil(interrupted::get);
    }

    @Test
    public void testContextCloseContextSerialization() throws Exception {
        int num = 4;
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        ContextInternal ctx = this.vertx.createVirtualThreadContext();
        Thread[] threads = new Thread[num];
        List promises = IntStream.range(0, num).mapToObj(idx -> Promise.promise()).collect(Collectors.toList());
        ConcurrentLinkedDeque<CyclicBarrier> latches = new ConcurrentLinkedDeque<CyclicBarrier>();
        CyclicBarrier[] l = new CyclicBarrier[num];
        AtomicInteger count = new AtomicInteger();
        for (int i = 0; i < num; ++i) {
            CyclicBarrier latch;
            int idx2 = i;
            l[i] = latch = new CyclicBarrier(2);
            latches.add(latch);
            ctx.runOnContext(v -> {
                threads[idx] = Thread.currentThread();
                try {
                    Future.await((Future)((Promise)promises.get(idx2)).future());
                    this.fail();
                }
                catch (Exception e) {
                    this.assertTrue(e instanceof InterruptedException);
                    CyclicBarrier barrier = (CyclicBarrier)latches.removeFirst();
                    int val = count.addAndGet(1);
                    this.assertTrue(val == 1);
                    try {
                        barrier.await();
                    }
                    catch (Exception ex) {
                        throw new RuntimeException(ex);
                    }
                    finally {
                        count.decrementAndGet();
                    }
                }
            });
        }
        VirtualThreadContextTest.assertWaitUntil(() -> {
            for (Thread thread : threads) {
                if (thread != null && thread.getState() == Thread.State.WAITING) continue;
                return false;
            }
            return true;
        });
        Future f = ctx.close();
        for (int i = 0; i < num; ++i) {
            try {
                l[i].await();
                continue;
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
        }
        f.toCompletionStage().toCompletableFuture().get(20L, TimeUnit.SECONDS);
    }

    @Test
    public void testAwaitWhenClosed() throws Exception {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        ContextInternal ctx = this.vertx.createVirtualThreadContext();
        CountDownLatch latch = new CountDownLatch(1);
        ctx.runOnContext(v -> {
            latch.countDown();
            try {
                new CountDownLatch(1).await();
                this.fail();
            }
            catch (InterruptedException expected) {
                this.assertFalse(Thread.currentThread().isInterrupted());
            }
            try {
                Future.await((Future)Promise.promise().future());
                this.fail();
            }
            catch (Exception e) {
                this.assertEquals(InterruptedException.class, e.getClass());
                this.testComplete();
            }
        });
        this.awaitLatch(latch);
        ctx.close();
        this.await();
    }

    @Test
    public void testSubmitAfterClose() {
        Assume.assumeTrue((boolean)this.isVirtualThreadAvailable());
        ContextInternal ctx = this.vertx.createVirtualThreadContext();
        ctx.close();
        ctx.runOnContext(v -> this.testComplete());
        this.await();
    }

    private /* synthetic */ void lambda$testVirtualThreadInterruptOnClose$24(AtomicReference ref, Promise promise, AtomicBoolean interrupted, Void v) {
        try {
            ref.set(Thread.currentThread());
            Future fut = promise.future();
            Future.await((Future)fut);
            this.fail();
        }
        catch (Throwable e) {
            if (e instanceof InterruptedException) {
                interrupted.set(true);
            }
            throw e;
        }
    }
}

