/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
import org.apache.flink.runtime.leaderelection.LeaderElectionEventHandler;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingContender;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionEventHandler;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver;
import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalEventHandler;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.TestingLeaderRetrievalEventHandler;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.CreateBuilder;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperLeaderElectionTest
extends TestLogger {
    private TestingServer testingServer;
    private Configuration configuration;
    private CuratorFramework client;
    private static final String TEST_URL = "akka//user/jobmanager";
    private static final LeaderInformation TEST_LEADER = LeaderInformation.known((UUID)UUID.randomUUID(), (String)"akka//user/jobmanager");
    private static final long timeout = 200000L;
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionTest.class);

    @Before
    public void before() {
        try {
            this.testingServer = new TestingServer();
        }
        catch (Exception e) {
            throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
        }
        this.configuration = new Configuration();
        this.configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.testingServer.getConnectString());
        this.configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        this.client = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration);
    }

    @After
    public void after() throws IOException {
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
        if (this.testingServer != null) {
            this.testingServer.stop();
            this.testingServer = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testZooKeeperLeaderElectionRetrieval() throws Exception {
        TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER);
        TestingLeaderRetrievalEventHandler retrievalEventHandler = new TestingLeaderRetrievalEventHandler();
        ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
        ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
        try {
            leaderElectionDriver = this.createAndInitLeaderElectionDriver(this.client, electionEventHandler);
            leaderRetrievalDriver = ZooKeeperUtils.createLeaderRetrievalDriverFactory((CuratorFramework)this.client, (Configuration)this.configuration).createLeaderRetrievalDriver((LeaderRetrievalEventHandler)retrievalEventHandler, retrievalEventHandler::handleError);
            electionEventHandler.waitForLeader(200000L);
            Assert.assertThat((Object)electionEventHandler.getConfirmedLeaderInformation(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)TEST_LEADER));
            retrievalEventHandler.waitForNewLeader(200000L);
            Assert.assertThat((Object)retrievalEventHandler.getLeaderSessionID(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)TEST_LEADER.getLeaderSessionID()));
            Assert.assertThat((Object)retrievalEventHandler.getAddress(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)TEST_LEADER.getLeaderAddress()));
        }
        finally {
            electionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testZooKeeperReelection() throws Exception {
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofMinutes(5L));
        int num = 10;
        DefaultLeaderElectionService[] leaderElectionService = new DefaultLeaderElectionService[num];
        TestingContender[] contenders = new TestingContender[num];
        DefaultLeaderRetrievalService leaderRetrievalService = null;
        TestingListener listener = new TestingListener();
        try {
            leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService((CuratorFramework)this.client, (Configuration)this.configuration);
            LOG.debug("Start leader retrieval service for the TestingListener.");
            leaderRetrievalService.start((LeaderRetrievalListener)listener);
            for (int i = 0; i < num; ++i) {
                leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService((CuratorFramework)this.client, (Configuration)this.configuration);
                contenders[i] = new TestingContender(this.createAddress(i), (LeaderElectionService)leaderElectionService[i]);
                LOG.debug("Start leader election service for contender #{}.", (Object)i);
                leaderElectionService[i].start((LeaderContender)contenders[i]);
            }
            String pattern = "akka//user/jobmanager_(\\d+)";
            Pattern regex = Pattern.compile(pattern);
            int numberSeenLeaders = 0;
            while (deadline.hasTimeLeft() && numberSeenLeaders < num) {
                LOG.debug("Wait for new leader #{}.", (Object)numberSeenLeaders);
                String address = listener.waitForNewLeader(deadline.timeLeft().toMillis());
                Matcher m = regex.matcher(address);
                if (m.find()) {
                    int index = Integer.parseInt(m.group(1));
                    TestingContender contender = contenders[index];
                    if (!address.equals(this.createAddress(index)) || !listener.getLeaderSessionID().equals(contender.getLeaderSessionID())) continue;
                    LOG.debug("Stop leader election service of contender #{}.", (Object)numberSeenLeaders);
                    leaderElectionService[index].stop();
                    leaderElectionService[index] = null;
                    ++numberSeenLeaders;
                    continue;
                }
                Assert.fail((String)"Did not find the leader's index.");
            }
            Assert.assertFalse((String)"Did not complete the leader reelection in time.", (boolean)deadline.isOverdue());
            Assert.assertEquals((long)num, (long)numberSeenLeaders);
        }
        finally {
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            for (DefaultLeaderElectionService electionService : leaderElectionService) {
                if (electionService == null) continue;
                electionService.stop();
            }
        }
    }

    @Nonnull
    private String createAddress(int i) {
        return "akka//user/jobmanager_" + i;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testZooKeeperReelectionWithReplacement() throws Exception {
        int num = 3;
        int numTries = 30;
        DefaultLeaderElectionService[] leaderElectionService = new DefaultLeaderElectionService[num];
        TestingContender[] contenders = new TestingContender[num];
        DefaultLeaderRetrievalService leaderRetrievalService = null;
        TestingListener listener = new TestingListener();
        try {
            leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService((CuratorFramework)this.client, (Configuration)this.configuration);
            leaderRetrievalService.start((LeaderRetrievalListener)listener);
            for (int i = 0; i < num; ++i) {
                leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService((CuratorFramework)this.client, (Configuration)this.configuration);
                contenders[i] = new TestingContender("akka//user/jobmanager_" + i + "_0", (LeaderElectionService)leaderElectionService[i]);
                leaderElectionService[i].start((LeaderContender)contenders[i]);
            }
            String pattern = "akka//user/jobmanager_(\\d+)_(\\d+)";
            Pattern regex = Pattern.compile(pattern);
            for (int i = 0; i < numTries; ++i) {
                listener.waitForNewLeader(200000L);
                String address = listener.getAddress();
                Matcher m = regex.matcher(address);
                if (!m.find()) {
                    throw new Exception("Did not find the leader's index.");
                }
                int index = Integer.parseInt(m.group(1));
                int lastTry = Integer.parseInt(m.group(2));
                Assert.assertEquals((Object)listener.getLeaderSessionID(), (Object)contenders[index].getLeaderSessionID());
                leaderElectionService[index].stop();
                leaderElectionService[index] = ZooKeeperUtils.createLeaderElectionService((CuratorFramework)this.client, (Configuration)this.configuration);
                contenders[index] = new TestingContender("akka//user/jobmanager_" + index + "_" + (lastTry + 1), (LeaderElectionService)leaderElectionService[index]);
                leaderElectionService[index].start((LeaderContender)contenders[index]);
            }
        }
        finally {
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            for (DefaultLeaderElectionService electionService : leaderElectionService) {
                if (electionService == null) continue;
                electionService.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception {
        String faultyContenderUrl = "faultyContender";
        String leaderPath = "/leader";
        this.configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH, "/leader");
        TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER);
        TestingLeaderRetrievalEventHandler retrievalEventHandler = new TestingLeaderRetrievalEventHandler();
        ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
        ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
        CuratorFramework anotherClient = null;
        try {
            leaderElectionDriver = this.createAndInitLeaderElectionDriver(this.client, electionEventHandler);
            electionEventHandler.waitForLeader(200000L);
            Assert.assertThat((Object)electionEventHandler.getConfirmedLeaderInformation(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)TEST_LEADER));
            anotherClient = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeUTF("faultyContender");
            oos.writeObject(UUID.randomUUID());
            oos.close();
            boolean dataWritten = false;
            while (!dataWritten) {
                anotherClient.delete().forPath("/leader");
                try {
                    anotherClient.create().forPath("/leader", baos.toByteArray());
                    dataWritten = true;
                }
                catch (KeeperException.NodeExistsException nodeExistsException) {}
            }
            leaderRetrievalDriver = ZooKeeperUtils.createLeaderRetrievalDriverFactory((CuratorFramework)this.client, (Configuration)this.configuration).createLeaderRetrievalDriver((LeaderRetrievalEventHandler)retrievalEventHandler, retrievalEventHandler::handleError);
            if (retrievalEventHandler.waitForNewLeader(200000L).equals("faultyContender")) {
                retrievalEventHandler.waitForNewLeader(200000L);
            }
            Assert.assertThat((Object)retrievalEventHandler.getLeaderSessionID(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)TEST_LEADER.getLeaderSessionID()));
            Assert.assertThat((Object)retrievalEventHandler.getAddress(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)TEST_LEADER.getLeaderAddress()));
        }
        finally {
            electionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
            if (anotherClient != null) {
                anotherClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExceptionForwarding() throws Exception {
        ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
        TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER);
        CuratorFramework client = null;
        CreateBuilder mockCreateBuilder = (CreateBuilder)Mockito.mock(CreateBuilder.class, (Answer)Mockito.RETURNS_DEEP_STUBS);
        String exMsg = "Test exception";
        Exception testException = new Exception("Test exception");
        try {
            client = (CuratorFramework)Mockito.spy((Object)ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration));
            ((CuratorFramework)Mockito.doAnswer(invocation -> mockCreateBuilder).when((Object)client)).create();
            Mockito.when((Object)((ACLBackgroundPathAndBytesable)mockCreateBuilder.creatingParentsIfNeeded().withMode((CreateMode)Matchers.any(CreateMode.class))).forPath(Mockito.anyString(), (byte[])Mockito.any(byte[].class))).thenThrow(new Throwable[]{testException});
            leaderElectionDriver = this.createAndInitLeaderElectionDriver(client, electionEventHandler);
            electionEventHandler.waitForError(200000L);
            Assert.assertNotNull((Object)electionEventHandler.getError());
            Assert.assertThat((Object)ExceptionUtils.findThrowableWithMessage((Throwable)electionEventHandler.getError(), (String)"Test exception").isPresent(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)true));
        }
        finally {
            electionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            if (client != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEphemeralZooKeeperNodes() throws Exception {
        ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
        ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
        TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER);
        TestingLeaderRetrievalEventHandler retrievalEventHandler = new TestingLeaderRetrievalEventHandler();
        CuratorFramework client = null;
        CuratorFramework client2 = null;
        NodeCache cache = null;
        try {
            client = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration);
            client2 = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration);
            leaderElectionDriver = this.createAndInitLeaderElectionDriver(client, electionEventHandler);
            leaderRetrievalDriver = ZooKeeperUtils.createLeaderRetrievalDriverFactory((CuratorFramework)client2, (Configuration)this.configuration).createLeaderRetrievalDriver((LeaderRetrievalEventHandler)retrievalEventHandler, retrievalEventHandler::handleError);
            String leaderPath = this.configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH);
            cache = new NodeCache(client2, leaderPath);
            ExistsCacheListener existsListener = new ExistsCacheListener(cache);
            DeletedCacheListener deletedCacheListener = new DeletedCacheListener(cache);
            cache.getListenable().addListener((Object)existsListener);
            cache.start();
            electionEventHandler.waitForLeader(200000L);
            retrievalEventHandler.waitForNewLeader(200000L);
            Future<Boolean> existsFuture = existsListener.nodeExists();
            existsFuture.get(200000L, TimeUnit.MILLISECONDS);
            cache.getListenable().addListener((Object)deletedCacheListener);
            leaderElectionDriver.close();
            client.close();
            Future<Boolean> deletedFuture = deletedCacheListener.nodeDeleted();
            deletedFuture.get(200000L, TimeUnit.MILLISECONDS);
            try {
                retrievalEventHandler.waitForNewLeader(1000L);
                Assert.fail((String)"TimeoutException was expected because there is no leader registered and thus there shouldn't be any leader information in ZooKeeper.");
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
        }
        finally {
            electionEventHandler.close();
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
            if (cache != null) {
                cache.close();
            }
            if (client2 != null) {
                client2.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNotLeaderShouldNotCleanUpTheLeaderInformation() throws Exception {
        TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER);
        TestingLeaderRetrievalEventHandler retrievalEventHandler = new TestingLeaderRetrievalEventHandler();
        ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
        ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
        try {
            leaderElectionDriver = this.createAndInitLeaderElectionDriver(this.client, electionEventHandler);
            electionEventHandler.waitForLeader(200000L);
            Assert.assertThat((Object)electionEventHandler.getConfirmedLeaderInformation(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)TEST_LEADER));
            leaderElectionDriver.notLeader();
            electionEventHandler.waitForRevokeLeader(200000L);
            Assert.assertThat((Object)electionEventHandler.getConfirmedLeaderInformation(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)LeaderInformation.empty()));
            leaderRetrievalDriver = ZooKeeperUtils.createLeaderRetrievalDriverFactory((CuratorFramework)this.client, (Configuration)this.configuration).createLeaderRetrievalDriver((LeaderRetrievalEventHandler)retrievalEventHandler, retrievalEventHandler::handleError);
            retrievalEventHandler.waitForNewLeader(200000L);
            Assert.assertThat((Object)retrievalEventHandler.getLeaderSessionID(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)TEST_LEADER.getLeaderSessionID()));
            Assert.assertThat((Object)retrievalEventHandler.getAddress(), (org.hamcrest.Matcher)org.hamcrest.Matchers.is((Object)TEST_LEADER.getLeaderAddress()));
        }
        finally {
            electionEventHandler.close();
            if (leaderElectionDriver != null) {
                leaderElectionDriver.close();
            }
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
        }
    }

    private ZooKeeperLeaderElectionDriver createAndInitLeaderElectionDriver(CuratorFramework client, TestingLeaderElectionEventHandler electionEventHandler) throws Exception {
        ZooKeeperLeaderElectionDriver leaderElectionDriver = ZooKeeperUtils.createLeaderElectionDriverFactory((CuratorFramework)client, (Configuration)this.configuration).createLeaderElectionDriver((LeaderElectionEventHandler)electionEventHandler, electionEventHandler::handleError, TEST_URL);
        electionEventHandler.init((LeaderElectionDriver)leaderElectionDriver);
        return leaderElectionDriver;
    }

    private static class DeletedCacheListener
    implements NodeCacheListener {
        final CompletableFuture<Boolean> deletedPromise = new CompletableFuture();
        final NodeCache cache;

        public DeletedCacheListener(NodeCache cache) {
            this.cache = cache;
        }

        public Future<Boolean> nodeDeleted() {
            return this.deletedPromise;
        }

        public void nodeChanged() throws Exception {
            ChildData data = this.cache.getCurrentData();
            if (data == null && !this.deletedPromise.isDone()) {
                this.deletedPromise.complete(true);
                this.cache.getListenable().removeListener((Object)this);
            }
        }
    }

    private static class ExistsCacheListener
    implements NodeCacheListener {
        final CompletableFuture<Boolean> existsPromise = new CompletableFuture();
        final NodeCache cache;

        public ExistsCacheListener(NodeCache cache) {
            this.cache = cache;
        }

        public Future<Boolean> nodeExists() {
            return this.existsPromise;
        }

        public void nodeChanged() throws Exception {
            ChildData data = this.cache.getCurrentData();
            if (data != null && !this.existsPromise.isDone()) {
                this.existsPromise.complete(true);
                this.cache.getListenable().removeListener((Object)this);
            }
        }
    }
}

