/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounterTestBase;
import org.apache.flink.runtime.checkpoint.DefaultLastStateConnectionStateListener;
import org.apache.flink.runtime.checkpoint.LastStateConnectionStateListener;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ZooKeeperCheckpointIDCounterITCase
extends CheckpointIDCounterTestBase {
    private final ZooKeeperExtension zooKeeperExtension = new ZooKeeperExtension();
    @RegisterExtension
    final EachCallbackWrapper<ZooKeeperExtension> zooKeeperResource = new EachCallbackWrapper((CustomExtension)this.zooKeeperExtension);
    @RegisterExtension
    final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource = new TestingFatalErrorHandlerExtension();

    ZooKeeperCheckpointIDCounterITCase() {
    }

    private CuratorFramework getZooKeeperClient() {
        return this.zooKeeperExtension.getZooKeeperClient(this.testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
    }

    @Test
    void testShutdownRemovesState() throws Exception {
        ZooKeeperCheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        CuratorFramework client = this.getZooKeeperClient();
        Assertions.assertThat((Object)client.checkExists().forPath(counter.getPath())).isNotNull();
        counter.shutdown(JobStatus.FINISHED).join();
        Assertions.assertThat((Object)client.checkExists().forPath(counter.getPath())).isNull();
    }

    @Test
    void testIdempotentShutdown() throws Exception {
        ZooKeeperCheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        CuratorFramework client = this.getZooKeeperClient();
        counter.shutdown(JobStatus.FINISHED).join();
        counter.shutdown(JobStatus.FINISHED).join();
        Assertions.assertThat((Object)client.checkExists().forPath(counter.getPath())).isNull();
    }

    @Test
    void testShutdownWithFailureDueToMissingConnection() throws Exception {
        ZooKeeperCheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        this.zooKeeperExtension.close();
        ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture((CompletableFuture)counter.shutdown(JobStatus.FINISHED)).as("The shutdown should fail because of the client connection being dropped.", new Object[0])).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(IllegalStateException.class);
    }

    @Test
    void testShutdownWithFailureDueToExistingChildNodes() throws Exception {
        ZooKeeperCheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        CuratorFramework client = ZooKeeperUtils.useNamespaceAndEnsurePath((CuratorFramework)this.getZooKeeperClient(), (String)"/");
        String counterNodePath = ZooKeeperUtils.generateZookeeperPath((String[])new String[]{counter.getPath()});
        String childNodePath = ZooKeeperUtils.generateZookeeperPath((String[])new String[]{counterNodePath, "unexpected-child-node-causing-a-failure"});
        client.create().forPath(childNodePath);
        String namespacedCounterNodePath = ZooKeeperUtils.generateZookeeperPath((String[])new String[]{client.getNamespace(), counterNodePath});
        KeeperException expectedRootCause = KeeperException.create((KeeperException.Code)KeeperException.Code.NOTEMPTY, (String)namespacedCounterNodePath);
        ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture((CompletableFuture)counter.shutdown(JobStatus.FINISHED)).as("The shutdown should fail because of a child node being present and the shutdown not performing an explicit recursive deletion.", new Object[0])).eventuallyFailsWith(ExecutionException.class).havingCause().withCause((Throwable)expectedRootCause);
        client.delete().forPath(childNodePath);
        counter.shutdown(JobStatus.FINISHED).join();
        ((ObjectAssert)Assertions.assertThat((Object)client.checkExists().forPath(counterNodePath)).as("A retry of the shutdown should have worked now after the root cause was resolved.", new Object[0])).isNull();
    }

    @Test
    void testSuspendKeepsState() throws Exception {
        ZooKeeperCheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        CuratorFramework client = this.getZooKeeperClient();
        Assertions.assertThat((Object)client.checkExists().forPath(counter.getPath())).isNotNull();
        counter.shutdown(JobStatus.SUSPENDED).join();
        Assertions.assertThat((Object)client.checkExists().forPath(counter.getPath())).isNotNull();
    }

    protected ZooKeeperCheckpointIDCounter createCheckpointIdCounter() throws Exception {
        return new ZooKeeperCheckpointIDCounter(ZooKeeperUtils.useNamespaceAndEnsurePath((CuratorFramework)this.getZooKeeperClient(), (String)"/"), (LastStateConnectionStateListener)new DefaultLastStateConnectionStateListener());
    }
}

