/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ZooKeeperCompletedCheckpointStoreTest {
    @RegisterExtension
    public static AllCallbackWrapper<ZooKeeperExtension> zooKeeperExtensionWrapper = new AllCallbackWrapper((CustomExtension)new ZooKeeperExtension());
    private static final ZooKeeperCheckpointStoreUtil zooKeeperCheckpointStoreUtil = ZooKeeperCheckpointStoreUtil.INSTANCE;

    ZooKeeperCompletedCheckpointStoreTest() {
    }

    @Test
    void testPathConversion() {
        long checkpointId = 42L;
        String path = zooKeeperCheckpointStoreUtil.checkpointIDToName(42L);
        Assertions.assertThat((long)zooKeeperCheckpointStoreUtil.nameToCheckpointID(path)).isEqualTo(42L);
    }

    @Test
    void testRecoverFailsIfDownloadFails() {
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ((ZooKeeperExtension)zooKeeperExtensionWrapper.getCustomExtension()).getConnectString());
        final ArrayList<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZk = new ArrayList<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>();
        try (CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((Configuration)configuration, (FatalErrorHandler)NoOpFatalErrorHandler.INSTANCE);){
            ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper = new ZooKeeperStateHandleStore<CompletedCheckpoint>(curatorFrameworkWrapper.asCuratorFramework(), new TestingRetrievableStateStorageHelper()){

                public List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> getAllAndLock() {
                    return checkpointsInZk;
                }
            };
            checkpointsInZk.add(this.createHandle(1L, id -> {
                throw new ExpectedTestException();
            }));
            Assertions.assertThatThrownBy(() -> ZooKeeperCompletedCheckpointStoreTest.lambda$testRecoverFailsIfDownloadFails$1((ZooKeeperStateHandleStore)checkpointsInZooKeeper)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ExpectedTestException.class)});
        }
    }

    private Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> createHandle(long id, Function<Long, CompletedCheckpoint> checkpointSupplier) {
        return Tuple2.of((Object)new CheckpointStateHandle(checkpointSupplier, id), (Object)zooKeeperCheckpointStoreUtil.checkpointIDToName(id));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testDiscardingSubsumedCheckpoints() throws Exception {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ((ZooKeeperExtension)zooKeeperExtensionWrapper.getCustomExtension()).getConnectString());
        CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((Configuration)configuration, (FatalErrorHandler)NoOpFatalErrorHandler.INSTANCE);
        CompletedCheckpointStore checkpointStore = this.createZooKeeperCheckpointStore(curatorFrameworkWrapper.asCuratorFramework());
        try {
            CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0L, (SharedStateRegistry)sharedStateRegistry);
            checkpointStore.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)checkpoint1, new CheckpointsCleaner(), () -> {});
            Assertions.assertThat((List)checkpointStore.getAllCheckpoints()).containsExactly((Object[])new CompletedCheckpoint[]{checkpoint1});
            CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint2 = CompletedCheckpointStoreTest.createCheckpoint(1L, (SharedStateRegistry)sharedStateRegistry);
            checkpointStore.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)checkpoint2, new CheckpointsCleaner(), () -> {});
            List allCheckpoints = checkpointStore.getAllCheckpoints();
            Assertions.assertThat((List)allCheckpoints).containsExactly((Object[])new CompletedCheckpoint[]{checkpoint2});
            CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
        }
        finally {
            curatorFrameworkWrapper.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testDiscardingCheckpointsAtShutDown() throws Exception {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ((ZooKeeperExtension)zooKeeperExtensionWrapper.getCustomExtension()).getConnectString());
        CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((Configuration)configuration, (FatalErrorHandler)NoOpFatalErrorHandler.INSTANCE);
        CompletedCheckpointStore checkpointStore = this.createZooKeeperCheckpointStore(curatorFrameworkWrapper.asCuratorFramework());
        try {
            CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0L, (SharedStateRegistry)sharedStateRegistry);
            checkpointStore.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)checkpoint1, new CheckpointsCleaner(), () -> {});
            Assertions.assertThat((List)checkpointStore.getAllCheckpoints()).containsExactly((Object[])new CompletedCheckpoint[]{checkpoint1});
            checkpointStore.shutdown(JobStatus.FINISHED, new CheckpointsCleaner());
            CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
        }
        finally {
            curatorFrameworkWrapper.close();
        }
    }

    @Nonnull
    private CompletedCheckpointStore createZooKeeperCheckpointStore(CuratorFramework client) throws Exception {
        ZooKeeperStateHandleStore checkpointsInZooKeeper = ZooKeeperUtils.createZooKeeperStateHandleStore((CuratorFramework)client, (String)"/checkpoints", new TestingRetrievableStateStorageHelper());
        return new DefaultCompletedCheckpointStore(1, (StateHandleStore)checkpointsInZooKeeper, (CheckpointStoreUtil)zooKeeperCheckpointStoreUtil, Collections.emptyList(), SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), Collections.emptyList(), RestoreMode.DEFAULT), Executors.directExecutor());
    }

    @Test
    void testAddCheckpointWithFailedRemove() throws Exception {
        boolean numCheckpointsToRetain = true;
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ((ZooKeeperExtension)zooKeeperExtensionWrapper.getCustomExtension()).getConnectString());
        try (CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((Configuration)configuration, (FatalErrorHandler)NoOpFatalErrorHandler.INSTANCE);){
            CompletedCheckpointStore store = this.createZooKeeperCheckpointStore(curatorFrameworkWrapper.asCuratorFramework());
            CountDownLatch discardAttempted = new CountDownLatch(1);
            for (long i = 0L; i < 2L; ++i) {
                CompletedCheckpoint checkpointToAdd = new CompletedCheckpoint(new JobID(), i, i, i, Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(), null);
                store.addCheckpointAndSubsumeOldestOne(checkpointToAdd, new CheckpointsCleaner(), () -> {
                    discardAttempted.countDown();
                    throw new RuntimeException();
                });
            }
            discardAttempted.await();
        }
    }

    private static /* synthetic */ void lambda$testRecoverFailsIfDownloadFails$1(ZooKeeperStateHandleStore checkpointsInZooKeeper) throws Throwable {
        DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints((StateHandleStore)checkpointsInZooKeeper, (CheckpointStoreUtil)zooKeeperCheckpointStoreUtil);
    }

    private static class CheckpointStateHandle
    implements RetrievableStateHandle<CompletedCheckpoint> {
        private static final long serialVersionUID = 1L;
        private final Function<Long, CompletedCheckpoint> checkpointSupplier;
        private final long id;

        CheckpointStateHandle(Function<Long, CompletedCheckpoint> checkpointSupplier, long id) {
            this.checkpointSupplier = checkpointSupplier;
            this.id = id;
        }

        public CompletedCheckpoint retrieveState() {
            return this.checkpointSupplier.apply(this.id);
        }

        public void discardState() {
        }

        public long getStateSize() {
            return 0L;
        }
    }
}

