/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRequestDecider;
import org.apache.flink.runtime.checkpoint.CheckpointRequestDeciderTest;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ZooKeeperCompletedCheckpointStoreITCase
extends CompletedCheckpointStoreTest {
    private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1);
    private static final String CHECKPOINT_PATH = "/checkpoints";
    private static final ZooKeeperCheckpointStoreUtil checkpointStoreUtil = ZooKeeperCheckpointStoreUtil.INSTANCE;

    @AfterClass
    public static void tearDown() throws Exception {
        ZOOKEEPER.shutdown();
    }

    @Before
    public void cleanUp() throws Exception {
        ZOOKEEPER.deleteAll();
    }

    @Override
    protected CompletedCheckpointStore createRecoveredCompletedCheckpointStore(int maxNumberOfCheckpointsToRetain, Executor executor) throws Exception {
        ZooKeeperStateHandleStore checkpointsInZooKeeper = ZooKeeperUtils.createZooKeeperStateHandleStore((CuratorFramework)ZOOKEEPER.getClient(), (String)CHECKPOINT_PATH, new TestingRetrievableStateStorageHelper());
        return new DefaultCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, (StateHandleStore)checkpointsInZooKeeper, (CheckpointStoreUtil)checkpointStoreUtil, DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints((StateHandleStore)checkpointsInZooKeeper, (CheckpointStoreUtil)checkpointStoreUtil), SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), Collections.emptyList(), RestoreMode.DEFAULT), executor);
    }

    @Test
    public void testRecover() throws Exception {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore checkpoints = this.createRecoveredCompletedCheckpointStore(3);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint[] expected = new CompletedCheckpointStoreTest.TestCompletedCheckpoint[]{ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(0L, (SharedStateRegistry)sharedStateRegistry), ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(1L, (SharedStateRegistry)sharedStateRegistry), ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(2L, (SharedStateRegistry)sharedStateRegistry)};
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)expected[0], new CheckpointsCleaner(), () -> {});
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)expected[1], new CheckpointsCleaner(), () -> {});
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)expected[2], new CheckpointsCleaner(), () -> {});
        this.verifyCheckpointRegistered(expected[0].getOperatorStates().values(), (SharedStateRegistry)sharedStateRegistry);
        this.verifyCheckpointRegistered(expected[1].getOperatorStates().values(), (SharedStateRegistry)sharedStateRegistry);
        this.verifyCheckpointRegistered(expected[2].getOperatorStates().values(), (SharedStateRegistry)sharedStateRegistry);
        Assert.assertEquals((long)3L, (long)((List)ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH)).size());
        Assert.assertEquals((long)3L, (long)checkpoints.getNumberOfRetainedCheckpoints());
        sharedStateRegistry.close();
        sharedStateRegistry = new SharedStateRegistryImpl();
        Assert.assertEquals((long)3L, (long)((List)ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH)).size());
        Assert.assertEquals((long)3L, (long)checkpoints.getNumberOfRetainedCheckpoints());
        Assert.assertEquals((Object)((Object)expected[2]), (Object)checkpoints.getLatestCheckpoint());
        ArrayList<CompletedCheckpointStoreTest.TestCompletedCheckpoint> expectedCheckpoints = new ArrayList<CompletedCheckpointStoreTest.TestCompletedCheckpoint>(3);
        expectedCheckpoints.add(expected[1]);
        expectedCheckpoints.add(expected[2]);
        expectedCheckpoints.add(ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(3L, (SharedStateRegistry)sharedStateRegistry));
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)expectedCheckpoints.get(2), new CheckpointsCleaner(), () -> {});
        List actualCheckpoints = checkpoints.getAllCheckpoints();
        Assert.assertEquals(expectedCheckpoints, (Object)actualCheckpoints);
        for (CompletedCheckpoint actualCheckpoint : actualCheckpoints) {
            this.verifyCheckpointRegistered(actualCheckpoint.getOperatorStates().values(), (SharedStateRegistry)sharedStateRegistry);
        }
    }

    @Test
    public void testShutdownDiscardsCheckpoints() throws Exception {
        CuratorFramework client = ZOOKEEPER.getClient();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore store = this.createRecoveredCompletedCheckpointStore(1);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(0L, (SharedStateRegistry)sharedStateRegistry);
        store.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)checkpoint, new CheckpointsCleaner(), () -> {});
        Assert.assertEquals((long)1L, (long)store.getNumberOfRetainedCheckpoints());
        Assert.assertNotNull((Object)client.checkExists().forPath(CHECKPOINT_PATH + checkpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID())));
        store.shutdown(JobStatus.FINISHED, new CheckpointsCleaner());
        Assert.assertEquals((long)0L, (long)store.getNumberOfRetainedCheckpoints());
        Assert.assertNull((Object)client.checkExists().forPath(CHECKPOINT_PATH + checkpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID())));
        sharedStateRegistry.close();
        Assert.assertEquals((long)0L, (long)this.createRecoveredCompletedCheckpointStore(1).getNumberOfRetainedCheckpoints());
    }

    @Test
    public void testSuspendKeepsCheckpoints() throws Exception {
        CuratorFramework client = ZOOKEEPER.getClient();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore store = this.createRecoveredCompletedCheckpointStore(1);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(0L, (SharedStateRegistry)sharedStateRegistry);
        store.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)checkpoint, new CheckpointsCleaner(), () -> {});
        Assert.assertEquals((long)1L, (long)store.getNumberOfRetainedCheckpoints());
        Assert.assertNotNull((Object)client.checkExists().forPath(CHECKPOINT_PATH + checkpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID())));
        store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
        Assert.assertEquals((long)0L, (long)store.getNumberOfRetainedCheckpoints());
        String checkpointPath = CHECKPOINT_PATH + checkpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID());
        List checkpointPathChildren = (List)client.getChildren().forPath(checkpointPath);
        Assert.assertEquals((String)"The checkpoint node should not be marked for deletion.", (long)1L, (long)checkpointPathChildren.size());
        String locksNodeName = (String)Iterables.getOnlyElement((Iterable)checkpointPathChildren);
        String locksNodePath = ZooKeeperUtils.generateZookeeperPath((String[])new String[]{checkpointPath, locksNodeName});
        Stat locksStat = (Stat)client.checkExists().forPath(locksNodePath);
        Assert.assertEquals((String)"There shouldn't be any lock node available for the checkpoint", (long)0L, (long)locksStat.getNumChildren());
        sharedStateRegistry.close();
        store = this.createRecoveredCompletedCheckpointStore(1);
        CompletedCheckpoint recovered = store.getLatestCheckpoint();
        Assert.assertEquals((Object)((Object)checkpoint), (Object)recovered);
    }

    @Test
    public void testLatestCheckpointRecovery() throws Exception {
        int numCheckpoints = 3;
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore checkpointStore = this.createRecoveredCompletedCheckpointStore(3);
        ArrayList<CompletedCheckpointStoreTest.TestCompletedCheckpoint> checkpoints = new ArrayList<CompletedCheckpointStoreTest.TestCompletedCheckpoint>(3);
        checkpoints.add(ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(9L, (SharedStateRegistry)sharedStateRegistry));
        checkpoints.add(ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(10L, (SharedStateRegistry)sharedStateRegistry));
        checkpoints.add(ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(11L, (SharedStateRegistry)sharedStateRegistry));
        for (CompletedCheckpoint completedCheckpoint : checkpoints) {
            checkpointStore.addCheckpointAndSubsumeOldestOne(completedCheckpoint, new CheckpointsCleaner(), () -> {});
        }
        sharedStateRegistry.close();
        CompletedCheckpoint latestCheckpoint = this.createRecoveredCompletedCheckpointStore(3).getLatestCheckpoint();
        Assert.assertEquals(checkpoints.get(checkpoints.size() - 1), (Object)latestCheckpoint);
    }

    @Test
    public void testConcurrentCheckpointOperations() throws Exception {
        boolean numberOfCheckpoints = true;
        long waitingTimeout = 50L;
        CompletedCheckpointStore zkCheckpointStore1 = this.createRecoveredCompletedCheckpointStore(1);
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(1L, (SharedStateRegistry)sharedStateRegistry);
        zkCheckpointStore1.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)completedCheckpoint, new CheckpointsCleaner(), () -> {});
        sharedStateRegistry.close();
        sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore zkCheckpointStore2 = this.createRecoveredCompletedCheckpointStore(1);
        CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint();
        Assert.assertTrue((boolean)(recoveredCheckpoint instanceof CompletedCheckpointStoreTest.TestCompletedCheckpoint));
        CompletedCheckpointStoreTest.TestCompletedCheckpoint recoveredTestCheckpoint = (CompletedCheckpointStoreTest.TestCompletedCheckpoint)recoveredCheckpoint;
        Assert.assertFalse((boolean)recoveredTestCheckpoint.isDiscarded());
        CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint2 = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(2L, (SharedStateRegistry)sharedStateRegistry);
        zkCheckpointStore1.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)completedCheckpoint2, new CheckpointsCleaner(), () -> {});
        List allCheckpoints = zkCheckpointStore1.getAllCheckpoints();
        Assert.assertEquals(Collections.singletonList(completedCheckpoint2), (Object)allCheckpoints);
        Assert.assertFalse((String)"The checkpoint should not have been discarded.", (boolean)recoveredTestCheckpoint.awaitDiscard(50L));
        Assert.assertFalse((boolean)recoveredTestCheckpoint.isDiscarded());
        CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint3 = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(3L, (SharedStateRegistry)sharedStateRegistry);
        zkCheckpointStore2.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)completedCheckpoint3, new CheckpointsCleaner(), () -> {});
        recoveredTestCheckpoint.awaitDiscard();
    }

    @Test
    public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() throws Exception {
        ManualClock clock = new ManualClock();
        clock.advanceTime(1L, TimeUnit.DAYS);
        int maxCleaningCheckpoints = 1;
        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
        CheckpointRequestDecider checkpointRequestDecider = new CheckpointRequestDecider(maxCleaningCheckpoints, unused -> {}, (Clock)clock, 1L, new AtomicInteger(0)::get, () -> ((CheckpointsCleaner)checkpointsCleaner).getNumberOfCheckpointsToClean());
        boolean maxCheckpointsToRetain = true;
        ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor();
        CompletedCheckpointStore checkpointStore = this.createRecoveredCompletedCheckpointStore(1, (Executor)executor);
        int nbCheckpointsToInject = 3;
        for (int i = 1; i <= nbCheckpointsToInject; ++i) {
            CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint = new CompletedCheckpointStoreTest.TestCompletedCheckpoint(new JobID(), i, i, Collections.emptyMap(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.RETAIN_ON_FAILURE));
            checkpointStore.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)completedCheckpoint, checkpointsCleaner, () -> {});
        }
        int nbCheckpointsSubmittedForCleaning = nbCheckpointsToInject - 1;
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> checkpointsCleaner.getNumberOfCheckpointsToClean() == nbCheckpointsSubmittedForCleaning));
        Assert.assertEquals((long)nbCheckpointsSubmittedForCleaning, (long)checkpointsCleaner.getNumberOfCheckpointsToClean());
        Assert.assertFalse((boolean)checkpointRequestDecider.chooseRequestToExecute(CheckpointRequestDeciderTest.regularCheckpoint(), false, 0L).isPresent());
        executor.triggerAll();
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> checkpointsCleaner.getNumberOfCheckpointsToClean() < nbCheckpointsSubmittedForCleaning));
        Assert.assertTrue((checkpointsCleaner.getNumberOfCheckpointsToClean() < nbCheckpointsSubmittedForCleaning ? 1 : 0) != 0);
        Assert.assertTrue((boolean)checkpointRequestDecider.chooseRequestToExecute(CheckpointRequestDeciderTest.regularCheckpoint(), false, 0L).isPresent());
        checkpointStore.shutdown(JobStatus.FINISHED, checkpointsCleaner);
    }

    static class HeapRetrievableStateHandle<T extends Serializable>
    implements RetrievableStateHandle<T> {
        private static final long serialVersionUID = -268548467968932L;
        private static AtomicInteger nextKey = new AtomicInteger(0);
        private static HashMap<Integer, Object> stateMap = new HashMap();
        private final int key = nextKey.getAndIncrement();

        public HeapRetrievableStateHandle(T state) {
            stateMap.put(this.key, state);
        }

        public T retrieveState() {
            return (T)((Serializable)stateMap.get(this.key));
        }

        public void discardState() throws Exception {
            stateMap.remove(this.key);
        }

        public long getStateSize() {
            return 0L;
        }
    }
}

