/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc;

import java.io.Serializable;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class AsyncCallsTest
extends TestLogger {
    private static final Duration timeout = Duration.ofSeconds(10L);
    private static RpcService rpcService;

    @BeforeClass
    public static void setup() throws Exception {
        rpcService = RpcSystem.load().localServiceBuilder(new Configuration()).createAndStart();
    }

    @AfterClass
    public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        rpcService.closeAsync().get();
    }

    @Test
    public void testScheduleWithNoDelay() throws Exception {
        this.runScheduleWithNoDelayTest(TestEndpoint::new);
    }

    @Test
    public void testFencedScheduleWithNoDelay() throws Exception {
        this.runScheduleWithNoDelayTest(FencedTestEndpoint::new);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runScheduleWithNoDelayTest(RpcEndpointFactory factory) throws Exception {
        ReentrantLock lock = new ReentrantLock();
        AtomicBoolean concurrentAccess = new AtomicBoolean(false);
        RpcEndpoint rpcEndpoint = factory.create(rpcService, lock, concurrentAccess);
        rpcEndpoint.start();
        try {
            TestGateway gateway = (TestGateway)rpcEndpoint.getSelfGateway(TestGateway.class);
            gateway.someCall();
            gateway.anotherCall();
            gateway.someCall();
            for (int i = 0; i < 10000; ++i) {
                rpcEndpoint.runAsync(() -> {
                    boolean holdsLock = lock.tryLock();
                    if (holdsLock) {
                        lock.unlock();
                    } else {
                        concurrentAccess.set(true);
                    }
                });
            }
            CompletableFuture result = rpcEndpoint.callAsync(() -> {
                boolean holdsLock = lock.tryLock();
                if (holdsLock) {
                    lock.unlock();
                } else {
                    concurrentAccess.set(true);
                }
                return "test";
            }, Duration.ofSeconds(30L));
            String str = (String)result.get(30L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)"test", (Object)str);
            Assert.assertFalse((String)"Rpc Endpoint had concurrent access", (boolean)concurrentAccess.get());
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{rpcEndpoint});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{rpcEndpoint});
    }

    @Test
    public void testScheduleWithDelay() throws Exception {
        this.runScheduleWithDelayTest(TestEndpoint::new);
    }

    @Test
    public void testFencedScheduleWithDelay() throws Exception {
        this.runScheduleWithDelayTest(FencedTestEndpoint::new);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runScheduleWithDelayTest(RpcEndpointFactory factory) throws Exception {
        ReentrantLock lock = new ReentrantLock();
        AtomicBoolean concurrentAccess = new AtomicBoolean(false);
        OneShotLatch latch = new OneShotLatch();
        long delay = 10L;
        RpcEndpoint rpcEndpoint = factory.create(rpcService, lock, concurrentAccess);
        rpcEndpoint.start();
        try {
            rpcEndpoint.runAsync(() -> {
                boolean holdsLock = lock.tryLock();
                if (holdsLock) {
                    lock.unlock();
                } else {
                    concurrentAccess.set(true);
                }
            });
            long start = System.nanoTime();
            rpcEndpoint.scheduleRunAsync(() -> {
                boolean holdsLock = lock.tryLock();
                if (holdsLock) {
                    lock.unlock();
                } else {
                    concurrentAccess.set(true);
                }
                latch.trigger();
            }, 10L, TimeUnit.MILLISECONDS);
            latch.await();
            long stop = System.nanoTime();
            Assert.assertFalse((String)"Rpc Endpoint had concurrent access", (boolean)concurrentAccess.get());
            Assert.assertTrue((String)"call was not properly delayed", ((stop - start) / 1000000L >= 10L ? 1 : 0) != 0);
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{rpcEndpoint});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{rpcEndpoint});
    }

    public static class FencedTestEndpoint
    extends FencedRpcEndpoint<UUID>
    implements TestGateway {
        private final ReentrantLock lock;
        private final AtomicBoolean concurrentAccess;
        private final OneShotLatch enteringSetNewFencingToken;
        private final OneShotLatch triggerSetNewFencingToken;

        protected FencedTestEndpoint(RpcService rpcService, ReentrantLock lock, AtomicBoolean concurrentAccess) {
            this(rpcService, lock, concurrentAccess, UUID.randomUUID(), new OneShotLatch(), new OneShotLatch());
        }

        protected FencedTestEndpoint(RpcService rpcService, UUID initialFencingToken, OneShotLatch enteringSetNewFencingToken, OneShotLatch triggerSetNewFencingToken) {
            this(rpcService, new ReentrantLock(), new AtomicBoolean(false), initialFencingToken, enteringSetNewFencingToken, triggerSetNewFencingToken);
        }

        private FencedTestEndpoint(RpcService rpcService, ReentrantLock lock, AtomicBoolean concurrentAccess, UUID initialFencingToken, OneShotLatch enteringSetNewFencingToken, OneShotLatch triggerSetNewFencingToken) {
            super(rpcService, (Serializable)initialFencingToken);
            this.lock = lock;
            this.concurrentAccess = concurrentAccess;
            this.enteringSetNewFencingToken = enteringSetNewFencingToken;
            this.triggerSetNewFencingToken = triggerSetNewFencingToken;
        }

        @Override
        public void someCall() {
            boolean holdsLock = this.lock.tryLock();
            if (holdsLock) {
                this.lock.unlock();
            } else {
                this.concurrentAccess.set(true);
            }
        }

        @Override
        public void anotherCall() {
            boolean holdsLock = this.lock.tryLock();
            if (holdsLock) {
                this.lock.unlock();
            } else {
                this.concurrentAccess.set(true);
            }
        }
    }

    private static class TestEndpoint
    extends RpcEndpoint
    implements TestGateway {
        private final ReentrantLock lock;
        private final AtomicBoolean concurrentAccess;

        TestEndpoint(RpcService rpcService, ReentrantLock lock, AtomicBoolean concurrentAccess) {
            super(rpcService);
            this.lock = lock;
            this.concurrentAccess = concurrentAccess;
        }

        @Override
        public void someCall() {
            boolean holdsLock = this.lock.tryLock();
            if (holdsLock) {
                this.lock.unlock();
            } else {
                this.concurrentAccess.set(true);
            }
        }

        @Override
        public void anotherCall() {
            boolean holdsLock = this.lock.tryLock();
            if (holdsLock) {
                this.lock.unlock();
            } else {
                this.concurrentAccess.set(true);
            }
        }
    }

    public static interface TestGateway
    extends RpcGateway {
        public void someCall();

        public void anotherCall();
    }

    @FunctionalInterface
    private static interface RpcEndpointFactory {
        public RpcEndpoint create(RpcService var1, ReentrantLock var2, AtomicBoolean var3);
    }
}

