/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.io.Serializable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStoreWatcher;
import org.apache.flink.runtime.jobmanager.TestingJobGraphListener;
import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStoreWatcher;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class ZooKeeperJobGraphStoreWatcherTest
extends TestLogger {
    @Rule
    public ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private Configuration configuration;
    private TestingJobGraphListener testingJobGraphListener;

    @Before
    public void setup() throws Exception {
        this.configuration = new Configuration();
        this.configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, (Object)this.zooKeeperResource.getConnectString());
        this.configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)this.temporaryFolder.newFolder().getAbsolutePath());
        this.testingJobGraphListener = new TestingJobGraphListener();
    }

    @Test
    public void testJobGraphAddedAndRemovedShouldNotifyGraphStoreListener() throws Exception {
        try (CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration, (FatalErrorHandler)NoOpFatalErrorHandler.INSTANCE);){
            CuratorFramework client = curatorFrameworkWrapper.asCuratorFramework();
            JobGraphStoreWatcher jobGraphStoreWatcher = this.createAndStartJobGraphStoreWatcher(client);
            ZooKeeperStateHandleStore<JobGraph> stateHandleStore = this.createStateHandleStore(client);
            JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
            JobID jobID = jobGraph.getJobID();
            stateHandleStore.addAndLock("/" + jobID, (Serializable)jobGraph);
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> this.testingJobGraphListener.getAddedJobGraphs().size() > 0));
            Assert.assertThat(this.testingJobGraphListener.getAddedJobGraphs(), (Matcher)Matchers.contains((Object[])new JobID[]{jobID}));
            stateHandleStore.releaseAndTryRemove("/" + jobID);
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> this.testingJobGraphListener.getRemovedJobGraphs().size() > 0));
            Assert.assertThat(this.testingJobGraphListener.getRemovedJobGraphs(), (Matcher)Matchers.contains((Object[])new JobID[]{jobID}));
            jobGraphStoreWatcher.stop();
        }
    }

    private JobGraphStoreWatcher createAndStartJobGraphStoreWatcher(CuratorFramework client) throws Exception {
        ZooKeeperJobGraphStoreWatcher jobGraphStoreWatcher = new ZooKeeperJobGraphStoreWatcher(new PathChildrenCache(client, "/", false));
        jobGraphStoreWatcher.start((JobGraphStore.JobGraphListener)this.testingJobGraphListener);
        return jobGraphStoreWatcher;
    }

    private ZooKeeperStateHandleStore<JobGraph> createStateHandleStore(CuratorFramework client) throws Exception {
        FileSystemStateStorageHelper stateStorage = ZooKeeperUtils.createFileSystemStateStorage((Configuration)this.configuration, (String)"test_jobgraph");
        return new ZooKeeperStateHandleStore(client, (RetrievableStateStorageHelper)stateStorage);
    }
}

