/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.util.Collection;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.dispatcher.NoOpExecutionPlanListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.DefaultExecutionPlanStore;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStoreUtil;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStoreWatcher;
import org.apache.flink.runtime.jobmanager.TestingExecutionPlanListener;
import org.apache.flink.runtime.jobmanager.ZooKeeperExecutionPlanStoreUtil;
import org.apache.flink.runtime.jobmanager.ZooKeeperExecutionPlanStoreWatcher;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowableTypeAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class ZooKeeperExecutionPlansStoreITCase
extends TestLogger {
    private final ZooKeeperExtension zooKeeperExtension = new ZooKeeperExtension();
    @RegisterExtension
    final EachCallbackWrapper<ZooKeeperExtension> zooKeeperResource = new EachCallbackWrapper((CustomExtension)this.zooKeeperExtension);
    @RegisterExtension
    final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource = new TestingFatalErrorHandlerExtension();
    private static final RetrievableStateStorageHelper<ExecutionPlan> localStateStorage = executionPlan -> {
        ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject((Object)executionPlan));
        return new RetrievableStreamStateHandle((StreamStateHandle)byteStreamStateHandle);
    };

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPutAndRemoveExecutionPlan() throws Exception {
        ExecutionPlanStore executionPlans = this.createZooKeeperExecutionPlanStore("/testPutAndRemoveExecutionPlan");
        try {
            ExecutionPlanStore.ExecutionPlanListener listener = (ExecutionPlanStore.ExecutionPlanListener)Mockito.mock(ExecutionPlanStore.ExecutionPlanListener.class);
            executionPlans.start(listener);
            ExecutionPlan executionPlan = this.createExecutionPlan(new JobID(), "JobName");
            Assertions.assertThat((Collection)executionPlans.getJobIds()).isEmpty();
            executionPlans.putExecutionPlan(executionPlan);
            Collection jobIds = executionPlans.getJobIds();
            Assertions.assertThat((Collection)jobIds).hasSize(1);
            JobID jobId = (JobID)jobIds.iterator().next();
            this.verifyExecutionPlans(executionPlan, executionPlans.recoverExecutionPlan(jobId));
            executionPlan = this.createExecutionPlan(executionPlan.getJobID(), "Updated JobName");
            executionPlans.putExecutionPlan(executionPlan);
            jobIds = executionPlans.getJobIds();
            Assertions.assertThat((Collection)jobIds).hasSize(1);
            jobId = (JobID)jobIds.iterator().next();
            this.verifyExecutionPlans(executionPlan, executionPlans.recoverExecutionPlan(jobId));
            executionPlans.globalCleanupAsync(executionPlan.getJobID(), Executors.directExecutor()).join();
            Assertions.assertThat((Collection)executionPlans.getJobIds()).isEmpty();
            ((ExecutionPlanStore.ExecutionPlanListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.atMost((int)1))).onAddedExecutionPlan((JobID)ArgumentMatchers.any(JobID.class));
            ((ExecutionPlanStore.ExecutionPlanListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedExecutionPlan((JobID)ArgumentMatchers.any(JobID.class));
            executionPlans.globalCleanupAsync(executionPlan.getJobID(), Executors.directExecutor()).join();
        }
        finally {
            executionPlans.stop();
        }
    }

    @Nonnull
    private ExecutionPlanStore createZooKeeperExecutionPlanStore(String fullPath) throws Exception {
        CuratorFramework client = this.zooKeeperExtension.getZooKeeperClient(this.testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
        client.newNamespaceAwareEnsurePath(fullPath).ensure(client.getZookeeperClient());
        CuratorFramework facade = client.usingNamespace(client.getNamespace() + fullPath);
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(facade, localStateStorage);
        return new DefaultExecutionPlanStore((StateHandleStore)zooKeeperStateHandleStore, (ExecutionPlanStoreWatcher)new ZooKeeperExecutionPlanStoreWatcher(new PathChildrenCache(facade, "/", false)), (ExecutionPlanStoreUtil)ZooKeeperExecutionPlanStoreUtil.INSTANCE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRecoverExecutionPlans() throws Exception {
        ExecutionPlanStore executionPlans = this.createZooKeeperExecutionPlanStore("/testRecoverExecutionPlans");
        try {
            ExecutionPlanStore.ExecutionPlanListener listener = (ExecutionPlanStore.ExecutionPlanListener)Mockito.mock(ExecutionPlanStore.ExecutionPlanListener.class);
            executionPlans.start(listener);
            HashMap<JobID, ExecutionPlan> expected = new HashMap<JobID, ExecutionPlan>();
            JobID[] jobIds = new JobID[]{new JobID(), new JobID(), new JobID()};
            expected.put(jobIds[0], this.createExecutionPlan(jobIds[0]));
            expected.put(jobIds[1], this.createExecutionPlan(jobIds[1]));
            expected.put(jobIds[2], this.createExecutionPlan(jobIds[2]));
            for (ExecutionPlan executionPlan : expected.values()) {
                executionPlans.putExecutionPlan(executionPlan);
            }
            Collection actual = executionPlans.getJobIds();
            Assertions.assertThat((Collection)actual).hasSameSizeAs(expected.entrySet());
            for (JobID jobId : actual) {
                ExecutionPlan executionPlan = executionPlans.recoverExecutionPlan(jobId);
                Assertions.assertThat(expected).containsKey((Object)executionPlan.getJobID());
                this.verifyExecutionPlans((ExecutionPlan)expected.get(executionPlan.getJobID()), executionPlan);
                executionPlans.globalCleanupAsync(executionPlan.getJobID(), Executors.directExecutor()).join();
            }
            Assertions.assertThat((Collection)executionPlans.getJobIds()).isEmpty();
            ((ExecutionPlanStore.ExecutionPlanListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.atMost((int)expected.size()))).onAddedExecutionPlan((JobID)ArgumentMatchers.any(JobID.class));
            ((ExecutionPlanStore.ExecutionPlanListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedExecutionPlan((JobID)ArgumentMatchers.any(JobID.class));
        }
        finally {
            executionPlans.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentAddExecutionPlan() throws Exception {
        ExecutionPlanStore executionPlans = null;
        ExecutionPlanStore otherExecutionPlans = null;
        try {
            executionPlans = this.createZooKeeperExecutionPlanStore("/testConcurrentAddExecutionPlan");
            otherExecutionPlans = this.createZooKeeperExecutionPlanStore("/testConcurrentAddExecutionPlan");
            ExecutionPlan executionPlan = this.createExecutionPlan(new JobID());
            ExecutionPlan otherExecutionPlan = this.createExecutionPlan(new JobID());
            ExecutionPlanStore.ExecutionPlanListener listener = (ExecutionPlanStore.ExecutionPlanListener)Mockito.mock(ExecutionPlanStore.ExecutionPlanListener.class);
            final JobID[] actualOtherJobId = new JobID[1];
            final CountDownLatch sync = new CountDownLatch(1);
            ((ExecutionPlanStore.ExecutionPlanListener)Mockito.doAnswer((Answer)new Answer<Void>(){

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    actualOtherJobId[0] = (JobID)invocation.getArguments()[0];
                    sync.countDown();
                    return null;
                }
            }).when((Object)listener)).onAddedExecutionPlan((JobID)ArgumentMatchers.any(JobID.class));
            executionPlans.start(listener);
            otherExecutionPlans.start((ExecutionPlanStore.ExecutionPlanListener)NoOpExecutionPlanListener.INSTANCE);
            executionPlans.putExecutionPlan(executionPlan);
            ((ExecutionPlanStore.ExecutionPlanListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onAddedExecutionPlan((JobID)ArgumentMatchers.any(JobID.class));
            ((ExecutionPlanStore.ExecutionPlanListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedExecutionPlan((JobID)ArgumentMatchers.any(JobID.class));
            otherExecutionPlans.putExecutionPlan(otherExecutionPlan);
            sync.await();
            ((ExecutionPlanStore.ExecutionPlanListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onAddedExecutionPlan((JobID)ArgumentMatchers.any(JobID.class));
            ((ExecutionPlanStore.ExecutionPlanListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedExecutionPlan((JobID)ArgumentMatchers.any(JobID.class));
            Assertions.assertThat((Comparable)actualOtherJobId[0]).isEqualTo((Object)otherExecutionPlan.getJobID());
        }
        finally {
            if (executionPlans != null) {
                executionPlans.stop();
            }
            if (otherExecutionPlans != null) {
                otherExecutionPlans.stop();
            }
        }
    }

    @Test
    public void testUpdateExecutionPlanYouDidNotGetOrAdd() throws Exception {
        ExecutionPlanStore executionPlans = this.createZooKeeperExecutionPlanStore("/testUpdateExecutionPlanYouDidNotGetOrAdd");
        ExecutionPlanStore otherExecutionPlans = this.createZooKeeperExecutionPlanStore("/testUpdateExecutionPlanYouDidNotGetOrAdd");
        executionPlans.start((ExecutionPlanStore.ExecutionPlanListener)NoOpExecutionPlanListener.INSTANCE);
        otherExecutionPlans.start((ExecutionPlanStore.ExecutionPlanListener)NoOpExecutionPlanListener.INSTANCE);
        ExecutionPlan executionPlan = this.createExecutionPlan(new JobID());
        executionPlans.putExecutionPlan(executionPlan);
        Assertions.assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> otherExecutionPlans.putExecutionPlan(executionPlan));
    }

    @Test
    public void testExecutionPlanRemovalFailureAndLockRelease() throws Exception {
        ExecutionPlanStore submittedExecutionPlanStore = this.createZooKeeperExecutionPlanStore("/testConcurrentAddExecutionPlan");
        ExecutionPlanStore otherSubmittedExecutionPlanStore = this.createZooKeeperExecutionPlanStore("/testConcurrentAddExecutionPlan");
        TestingExecutionPlanListener listener = new TestingExecutionPlanListener();
        submittedExecutionPlanStore.start((ExecutionPlanStore.ExecutionPlanListener)listener);
        otherSubmittedExecutionPlanStore.start((ExecutionPlanStore.ExecutionPlanListener)listener);
        JobGraph executionPlan = JobGraphTestUtils.emptyJobGraph();
        submittedExecutionPlanStore.putExecutionPlan((ExecutionPlan)executionPlan);
        ExecutionPlan recoveredExecutionPlan = otherSubmittedExecutionPlanStore.recoverExecutionPlan(executionPlan.getJobID());
        Assertions.assertThat((Object)recoveredExecutionPlan).isNotNull();
        ((ThrowableTypeAssert)Assertions.assertThatExceptionOfType(Exception.class).as("It should not be possible to remove the ExecutionPlan since the first store still has a lock on it.", new Object[0])).isThrownBy(() -> otherSubmittedExecutionPlanStore.globalCleanupAsync(recoveredExecutionPlan.getJobID(), Executors.directExecutor()).join());
        submittedExecutionPlanStore.stop();
        otherSubmittedExecutionPlanStore.globalCleanupAsync(recoveredExecutionPlan.getJobID(), Executors.directExecutor()).join();
        Assertions.assertThat((Object)otherSubmittedExecutionPlanStore.recoverExecutionPlan(recoveredExecutionPlan.getJobID())).isNull();
        otherSubmittedExecutionPlanStore.stop();
    }

    private ExecutionPlan createExecutionPlan(JobID jobId) {
        return this.createExecutionPlan(jobId, "Test ExecutionPlan");
    }

    private ExecutionPlan createExecutionPlan(JobID jobId, String jobName) {
        JobVertex jobVertex = new JobVertex("Test JobVertex");
        jobVertex.setParallelism(1);
        return JobGraphBuilder.newStreamingJobGraphBuilder().setJobName(jobName).setJobId(jobId).addJobVertex(jobVertex).build();
    }

    private void verifyExecutionPlans(ExecutionPlan expected, ExecutionPlan actual) {
        Assertions.assertThat((String)actual.getName()).isEqualTo(expected.getName());
        Assertions.assertThat((Comparable)actual.getJobID()).isEqualTo((Object)expected.getJobID());
    }
}

