/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class TaskLocalStateStoreImplTest {
    @TempDir
    protected Path temporaryFolder;
    protected File[] allocationBaseDirs;
    protected TaskLocalStateStoreImpl taskLocalStateStore;
    protected JobID jobID;
    protected AllocationID allocationID;
    protected JobVertexID jobVertexID;
    protected int subtaskIdx;

    TaskLocalStateStoreImplTest() {
    }

    @BeforeEach
    void before() throws Exception {
        this.jobID = new JobID();
        this.allocationID = new AllocationID();
        this.jobVertexID = new JobVertexID();
        this.subtaskIdx = 0;
        this.allocationBaseDirs = new File[]{TempDirUtils.newFolder((Path)this.temporaryFolder), TempDirUtils.newFolder((Path)this.temporaryFolder)};
        this.taskLocalStateStore = this.createTaskLocalStateStoreImpl(this.allocationBaseDirs, this.jobID, this.allocationID, this.jobVertexID, this.subtaskIdx);
    }

    @Nonnull
    private TaskLocalStateStoreImpl createTaskLocalStateStoreImpl(File[] allocationBaseDirs, JobID jobID, AllocationID allocationID, JobVertexID jobVertexID, int subtaskIdx) {
        LocalSnapshotDirectoryProviderImpl directoryProvider = new LocalSnapshotDirectoryProviderImpl(allocationBaseDirs, jobID, jobVertexID, subtaskIdx);
        LocalRecoveryConfig localRecoveryConfig = LocalRecoveryConfig.backupAndRecoveryEnabled((LocalSnapshotDirectoryProvider)directoryProvider);
        return new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, subtaskIdx, localRecoveryConfig, Executors.directExecutor());
    }

    @Test
    void getLocalRecoveryRootDirectoryProvider() {
        LocalRecoveryConfig directoryProvider = this.taskLocalStateStore.getLocalRecoveryConfig();
        Assertions.assertThat((int)((LocalSnapshotDirectoryProvider)directoryProvider.getLocalStateDirectoryProvider().get()).allocationBaseDirsCount()).isEqualTo(this.allocationBaseDirs.length);
        for (int i = 0; i < this.allocationBaseDirs.length; ++i) {
            Assertions.assertThat((File)((LocalSnapshotDirectoryProvider)directoryProvider.getLocalStateDirectoryProvider().get()).selectAllocationBaseDirectory(i)).isEqualTo((Object)this.allocationBaseDirs[i]);
        }
    }

    @Test
    void storeAndRetrieve() throws Exception {
        int chkCount = 3;
        for (int i = 0; i < 3; ++i) {
            Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState((long)i)).isNull();
        }
        List<TestingTaskStateSnapshot> taskStateSnapshots = this.storeStates(3);
        this.checkStoredAsExpected(taskStateSnapshots, 0, 3);
        Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState(4L)).isNull();
    }

    @Test
    void pruneCheckpoints() throws Exception {
        int chkCount = 3;
        List<TestingTaskStateSnapshot> taskStateSnapshots = this.storeStates(3);
        this.taskLocalStateStore.pruneMatchingCheckpoints(chk -> chk != 2L);
        for (int i = 0; i < 2; ++i) {
            Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState((long)i)).isNull();
        }
        this.checkStoredAsExpected(taskStateSnapshots, 2, 3);
    }

    @Test
    void confirmCheckpoint() throws Exception {
        int chkCount = 3;
        int confirmed = 2;
        List<TestingTaskStateSnapshot> taskStateSnapshots = this.storeStates(3);
        this.taskLocalStateStore.confirmCheckpoint(2L);
        this.checkPrunedAndDiscarded(taskStateSnapshots, 0, 2);
        this.checkStoredAsExpected(taskStateSnapshots, 2, 3);
    }

    @Test
    void abortCheckpoint() throws Exception {
        int chkCount = 4;
        int aborted = 2;
        List<TestingTaskStateSnapshot> taskStateSnapshots = this.storeStates(4);
        this.taskLocalStateStore.abortCheckpoint(2L);
        this.checkPrunedAndDiscarded(taskStateSnapshots, 2, 3);
        this.checkStoredAsExpected(taskStateSnapshots, 0, 2);
        this.checkStoredAsExpected(taskStateSnapshots, 3, 4);
    }

    @Test
    void dispose() throws Exception {
        int chkCount = 3;
        int confirmed = 2;
        List<TestingTaskStateSnapshot> taskStateSnapshots = this.storeStates(3);
        this.taskLocalStateStore.confirmCheckpoint(2L);
        this.taskLocalStateStore.dispose();
        this.checkPrunedAndDiscarded(taskStateSnapshots, 0, 3);
    }

    @Test
    void retrieveNullIfNoPersistedLocalState() {
        Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState(0L)).isNull();
    }

    @Test
    void retrieveNullIfDisableLocalRecovery() {
        LocalSnapshotDirectoryProviderImpl directoryProvider = new LocalSnapshotDirectoryProviderImpl(this.allocationBaseDirs, this.jobID, this.jobVertexID, this.subtaskIdx);
        LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(false, true, (LocalSnapshotDirectoryProvider)directoryProvider);
        TaskLocalStateStoreImpl localStateStore = new TaskLocalStateStoreImpl(this.jobID, this.allocationID, this.jobVertexID, this.subtaskIdx, localRecoveryConfig, Executors.directExecutor());
        TaskStateSnapshot taskStateSnapshot = this.createTaskStateSnapshot();
        long checkpointId = 1L;
        localStateStore.storeLocalState(1L, taskStateSnapshot);
        Assertions.assertThat((Object)localStateStore.retrieveLocalState(1L)).isNull();
    }

    @Test
    void retrievePersistedLocalStateFromDisc() {
        TaskStateSnapshot taskStateSnapshot = this.createTaskStateSnapshot();
        long checkpointId = 0L;
        this.taskLocalStateStore.storeLocalState(0L, taskStateSnapshot);
        TaskLocalStateStoreImpl newTaskLocalStateStore = this.createTaskLocalStateStoreImpl(this.allocationBaseDirs, this.jobID, this.allocationID, this.jobVertexID, 0);
        TaskStateSnapshot retrievedTaskStateSnapshot = newTaskLocalStateStore.retrieveLocalState(0L);
        Assertions.assertThat((Object)retrievedTaskStateSnapshot).isEqualTo((Object)taskStateSnapshot);
    }

    @Nonnull
    protected TaskStateSnapshot createTaskStateSnapshot() {
        HashMap<OperatorID, OperatorSubtaskState> operatorSubtaskStates = new HashMap<OperatorID, OperatorSubtaskState>();
        operatorSubtaskStates.put(new OperatorID(), OperatorSubtaskState.builder().build());
        operatorSubtaskStates.put(new OperatorID(), OperatorSubtaskState.builder().build());
        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(operatorSubtaskStates);
        return taskStateSnapshot;
    }

    @Test
    void deletesLocalStateIfRetrievalFails() throws IOException {
        TaskStateSnapshot taskStateSnapshot = this.createTaskStateSnapshot();
        long checkpointId = 0L;
        this.taskLocalStateStore.storeLocalState(0L, taskStateSnapshot);
        File taskStateSnapshotFile = this.taskLocalStateStore.getTaskStateSnapshotFile(0L);
        Files.write(taskStateSnapshotFile.toPath(), new byte[]{1, 2, 3, 4}, StandardOpenOption.WRITE);
        TaskLocalStateStoreImpl newTaskLocalStateStore = this.createTaskLocalStateStoreImpl(this.allocationBaseDirs, this.jobID, this.allocationID, this.jobVertexID, this.subtaskIdx);
        Assertions.assertThat((Object)newTaskLocalStateStore.retrieveLocalState(0L)).isNull();
        Assertions.assertThat((File)taskStateSnapshotFile.getParentFile()).doesNotExist();
    }

    private void checkStoredAsExpected(List<TestingTaskStateSnapshot> history, int start, int end) {
        for (int i = start; i < end; ++i) {
            TestingTaskStateSnapshot expected = history.get(i);
            Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState((long)i)).isSameAs((Object)expected);
            Assertions.assertThat((boolean)expected.isDiscarded()).isFalse();
        }
    }

    private void checkPrunedAndDiscarded(List<TestingTaskStateSnapshot> history, int start, int end) {
        for (int i = start; i < end; ++i) {
            Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState((long)i)).isNull();
            Assertions.assertThat((boolean)history.get(i).isDiscarded()).isTrue();
        }
    }

    private List<TestingTaskStateSnapshot> storeStates(int count) {
        ArrayList<TestingTaskStateSnapshot> taskStateSnapshots = new ArrayList<TestingTaskStateSnapshot>(count);
        for (int i = 0; i < count; ++i) {
            OperatorID operatorID = new OperatorID();
            TestingTaskStateSnapshot taskStateSnapshot = new TestingTaskStateSnapshot();
            OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().build();
            taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
            this.taskLocalStateStore.storeLocalState((long)i, (TaskStateSnapshot)taskStateSnapshot);
            taskStateSnapshots.add(taskStateSnapshot);
        }
        return taskStateSnapshots;
    }

    protected static final class TestingTaskStateSnapshot
    extends TaskStateSnapshot {
        private static final long serialVersionUID = 2046321877379917040L;
        private boolean isDiscarded = false;

        protected TestingTaskStateSnapshot() {
        }

        public void discardState() throws Exception {
            super.discardState();
            this.isDiscarded = true;
        }

        boolean isDiscarded() {
            return this.isDiscarded;
        }
    }
}

