/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
import org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.TestingStateHandleStore;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DefaultCompletedCheckpointStoreTest
extends TestLogger {
    private final long timeout = 100L;
    private TestingStateHandleStore.Builder<CompletedCheckpoint> builder;
    private TestingRetrievableStateStorageHelper<CompletedCheckpoint> checkpointStorageHelper;
    private ExecutorService executorService;

    @Before
    public void setup() {
        this.builder = TestingStateHandleStore.newBuilder();
        this.checkpointStorageHelper = new TestingRetrievableStateStorageHelper();
        this.executorService = Executors.newFixedThreadPool(2, (ThreadFactory)new ExecutorThreadFactory("IO-Executor"));
    }

    @After
    public void after() {
        this.executorService.shutdownNow();
    }

    @Test
    public void testAtLeastOneCheckpointRetained() throws Exception {
        CompletedCheckpoint cp1 = this.getCheckpoint(false, 1L);
        CompletedCheckpoint cp2 = this.getCheckpoint(false, 2L);
        CompletedCheckpoint sp1 = this.getCheckpoint(true, 3L);
        CompletedCheckpoint sp2 = this.getCheckpoint(true, 4L);
        CompletedCheckpoint sp3 = this.getCheckpoint(true, 5L);
        this.testCheckpointRetention(1, Arrays.asList(cp1, cp2, sp1, sp2, sp3), Arrays.asList(cp2, sp3));
    }

    @Test
    public void testOlderSavepointSubsumed() throws Exception {
        CompletedCheckpoint cp1 = this.getCheckpoint(false, 1L);
        CompletedCheckpoint sp1 = this.getCheckpoint(true, 2L);
        CompletedCheckpoint cp2 = this.getCheckpoint(false, 3L);
        this.testCheckpointRetention(1, Arrays.asList(cp1, sp1, cp2), Arrays.asList(cp2));
    }

    @Test
    public void testSubsumeAfterStoppingWithSavepoint() throws Exception {
        CompletedCheckpoint cp1 = this.getCheckpoint(false, 1L);
        CompletedCheckpoint sp1 = this.getCheckpoint(true, 2L);
        CompletedCheckpoint stop = this.getCheckpoint(CheckpointProperties.forSyncSavepoint((boolean)false, (boolean)false), 3L);
        this.testCheckpointRetention(1, Arrays.asList(cp1, sp1, stop), Arrays.asList(stop));
    }

    @Test
    public void testNotSubsumedIfNotNeeded() throws Exception {
        CompletedCheckpoint cp1 = this.getCheckpoint(false, 1L);
        CompletedCheckpoint cp2 = this.getCheckpoint(false, 2L);
        CompletedCheckpoint cp3 = this.getCheckpoint(false, 3L);
        this.testCheckpointRetention(3, Arrays.asList(cp1, cp2, cp3), Arrays.asList(cp1, cp2, cp3));
    }

    private void testCheckpointRetention(int numRetain, List<CompletedCheckpoint> completed, List<CompletedCheckpoint> expectedRetained) throws Exception {
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(3))).build();
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(stateHandleStore, numRetain);
        for (CompletedCheckpoint c : completed) {
            completedCheckpointStore.addCheckpoint(c, new CheckpointsCleaner(), () -> {});
        }
        Assert.assertEquals(expectedRetained, (Object)completedCheckpointStore.getAllCheckpoints());
    }

    @Test
    public void testRecoverSortedCheckpoints() throws Exception {
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(3))).build();
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(stateHandleStore);
        List recoveredCompletedCheckpoint = completedCheckpointStore.getAllCheckpoints();
        Assert.assertThat((Object)recoveredCompletedCheckpoint.size(), (Matcher)Matchers.is((Object)3));
        List checkpointIds = recoveredCompletedCheckpoint.stream().map(CompletedCheckpoint::getCheckpointID).collect(Collectors.toList());
        Assert.assertThat(checkpointIds, (Matcher)Matchers.contains((Object[])new Long[]{1L, 2L, 3L}));
    }

    @Test
    public void testCorruptDataInStateHandleStoreShouldNotBeSkipped() throws Exception {
        long corruptCkpId = 2L;
        this.checkpointStorageHelper.setRetrieveStateFunction((FunctionWithException<CompletedCheckpoint, CompletedCheckpoint, IOException>)((FunctionWithException)state -> {
            if (state.getCheckpointID() == 2L) {
                throw new IOException("Failed to retrieve checkpoint 2");
            }
            return state;
        }));
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(3))).build();
        try {
            this.createCompletedCheckpointStore(stateHandleStore);
        }
        catch (Exception e) {
            if (ExceptionUtils.findThrowable((Throwable)e, IOException.class).isPresent()) {
                return;
            }
            throw e;
        }
        Assert.fail();
    }

    @Test
    public void testAddCheckpointSuccessfullyShouldRemoveOldOnes() throws Exception {
        boolean num = true;
        CompletableFuture addFuture = new CompletableFuture();
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(1))).setAddFunction((BiFunctionWithException<String, CompletedCheckpoint, RetrievableStateHandle<CompletedCheckpoint>, Exception>)((BiFunctionWithException)(ignore, ckp) -> {
            addFuture.complete(ckp);
            return null;
        })).build();
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(stateHandleStore);
        Assert.assertThat((Object)completedCheckpointStore.getAllCheckpoints().size(), (Matcher)Matchers.is((Object)1));
        Assert.assertThat((Object)((CompletedCheckpoint)completedCheckpointStore.getAllCheckpoints().get(0)).getCheckpointID(), (Matcher)Matchers.is((Object)1L));
        long ckpId = 100L;
        CompletedCheckpointStoreTest.TestCompletedCheckpoint ckp2 = CompletedCheckpointStoreTest.createCheckpoint(100L, new SharedStateRegistry());
        completedCheckpointStore.addCheckpoint((CompletedCheckpoint)ckp2, new CheckpointsCleaner(), () -> {});
        CompletedCheckpoint addedCkp = (CompletedCheckpoint)addFuture.get(100L, TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)addedCkp.getCheckpointID(), (Matcher)Matchers.is((Object)100L));
        Assert.assertThat((Object)completedCheckpointStore.getAllCheckpoints().size(), (Matcher)Matchers.is((Object)1));
        Assert.assertThat((Object)((CompletedCheckpoint)completedCheckpointStore.getAllCheckpoints().get(0)).getCheckpointID(), (Matcher)Matchers.is((Object)100L));
    }

    @Test
    public void testAddCheckpointFailedShouldNotRemoveOldOnes() throws Exception {
        boolean num = true;
        String errMsg = "Add to state handle failed.";
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(1))).setAddFunction((BiFunctionWithException<String, CompletedCheckpoint, RetrievableStateHandle<CompletedCheckpoint>, Exception>)((BiFunctionWithException)(ignore, ckp) -> {
            throw new FlinkException("Add to state handle failed.");
        })).build();
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(stateHandleStore);
        Assert.assertThat((Object)completedCheckpointStore.getAllCheckpoints().size(), (Matcher)Matchers.is((Object)1));
        Assert.assertThat((Object)((CompletedCheckpoint)completedCheckpointStore.getAllCheckpoints().get(0)).getCheckpointID(), (Matcher)Matchers.is((Object)1L));
        long ckpId = 100L;
        CompletedCheckpointStoreTest.TestCompletedCheckpoint ckp2 = CompletedCheckpointStoreTest.createCheckpoint(100L, new SharedStateRegistry());
        try {
            completedCheckpointStore.addCheckpoint((CompletedCheckpoint)ckp2, new CheckpointsCleaner(), () -> {});
            Assert.fail((String)"We should get an exception when add checkpoint to failed..");
        }
        catch (FlinkException ex) {
            Assert.assertThat((Object)((Object)ex), (Matcher)FlinkMatchers.containsMessage((String)"Add to state handle failed."));
        }
        Assert.assertThat((Object)completedCheckpointStore.getAllCheckpoints().size(), (Matcher)Matchers.is((Object)1));
        Assert.assertThat((Object)((CompletedCheckpoint)completedCheckpointStore.getAllCheckpoints().get(0)).getCheckpointID(), (Matcher)Matchers.is((Object)1L));
    }

    @Test
    public void testShutdownShouldDiscardStateHandleWhenJobIsGloballyTerminalState() throws Exception {
        int num = 3;
        AtomicInteger removeCalledNum = new AtomicInteger(0);
        CompletableFuture clearEntriesAllFuture = new CompletableFuture();
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(3))).setRemoveFunction((FunctionWithException<String, Boolean, Exception>)((FunctionWithException)ignore -> {
            removeCalledNum.incrementAndGet();
            return true;
        })).setClearEntriesRunnable(() -> clearEntriesAllFuture.complete(null)).build();
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(stateHandleStore);
        Assert.assertThat((Object)completedCheckpointStore.getAllCheckpoints().size(), (Matcher)Matchers.is((Object)3));
        completedCheckpointStore.shutdown(JobStatus.CANCELED, new CheckpointsCleaner());
        Assert.assertThat((Object)removeCalledNum.get(), (Matcher)Matchers.is((Object)3));
        Assert.assertThat((Object)clearEntriesAllFuture.isDone(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)completedCheckpointStore.getAllCheckpoints().size(), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void testShutdownShouldNotDiscardStateHandleWhenJobIsNotGloballyTerminalState() throws Exception {
        AtomicInteger removeCalledNum = new AtomicInteger(0);
        CompletableFuture removeAllFuture = new CompletableFuture();
        CompletableFuture releaseAllFuture = new CompletableFuture();
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(3))).setRemoveFunction((FunctionWithException<String, Boolean, Exception>)((FunctionWithException)ignore -> {
            removeCalledNum.incrementAndGet();
            return true;
        })).setReleaseAllHandlesRunnable(() -> releaseAllFuture.complete(null)).setClearEntriesRunnable(() -> removeAllFuture.complete(null)).build();
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(stateHandleStore);
        Assert.assertThat((Object)completedCheckpointStore.getAllCheckpoints().size(), (Matcher)Matchers.is((Object)3));
        completedCheckpointStore.shutdown(JobStatus.CANCELLING, new CheckpointsCleaner());
        try {
            removeAllFuture.get(100L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"We should get an expected timeout because the job is not globally terminated.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertThat((Object)removeCalledNum.get(), (Matcher)Matchers.is((Object)0));
        Assert.assertThat((Object)removeAllFuture.isDone(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)releaseAllFuture.isDone(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)completedCheckpointStore.getAllCheckpoints().size(), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void testShutdownFailsAnyFutureCallsToAddCheckpoint() throws Exception {
        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
        for (JobStatus status : JobStatus.values()) {
            CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(this.builder.build());
            completedCheckpointStore.shutdown(status, checkpointsCleaner);
            Assert.assertThrows(IllegalStateException.class, () -> completedCheckpointStore.addCheckpoint((CompletedCheckpoint)CompletedCheckpointStoreTest.createCheckpoint(0L, new SharedStateRegistry()), checkpointsCleaner, () -> {}));
        }
    }

    private List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> createStateHandles(int num) {
        ArrayList<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> stateHandles = new ArrayList<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>();
        for (int i = 1; i <= num; ++i) {
            CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint = CompletedCheckpointStoreTest.createCheckpoint(i, new SharedStateRegistry());
            RetrievableStateHandle<CompletedCheckpoint> checkpointStateHandle = this.checkpointStorageHelper.store(completedCheckpoint);
            stateHandles.add((Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>)new Tuple2(checkpointStateHandle, (Object)String.valueOf(i)));
        }
        return stateHandles;
    }

    private CompletedCheckpointStore createCompletedCheckpointStore(TestingStateHandleStore<CompletedCheckpoint> stateHandleStore) throws Exception {
        return this.createCompletedCheckpointStore(stateHandleStore, 1);
    }

    private CompletedCheckpointStore createCompletedCheckpointStore(TestingStateHandleStore<CompletedCheckpoint> stateHandleStore, int toRetain) throws Exception {
        CheckpointStoreUtil checkpointStoreUtil = new CheckpointStoreUtil(){

            public String checkpointIDToName(long checkpointId) {
                return String.valueOf(checkpointId);
            }

            public long nameToCheckpointID(String name) {
                return Long.parseLong(name);
            }
        };
        return new DefaultCompletedCheckpointStore(toRetain, stateHandleStore, checkpointStoreUtil, DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(stateHandleStore, (CheckpointStoreUtil)checkpointStoreUtil), (Executor)this.executorService);
    }

    private CompletedCheckpoint getCheckpoint(boolean isSavepoint, long id) {
        return this.getCheckpoint(isSavepoint ? CheckpointProperties.forSavepoint((boolean)false) : CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), id);
    }

    private CompletedCheckpoint getCheckpoint(CheckpointProperties props, long id) {
        return new CompletedCheckpoint(new JobID(), id, 0L, 0L, Collections.emptyMap(), Collections.emptyList(), props, (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation());
    }
}

