/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl;
import org.apache.flink.runtime.state.TaskLocalStateStoreImplTest;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ChangelogTaskLocalStateStoreTest
extends TaskLocalStateStoreImplTest {
    private LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider;

    ChangelogTaskLocalStateStoreTest() {
    }

    @Override
    @BeforeEach
    void before() throws Exception {
        super.before();
        this.taskLocalStateStore = this.createChangelogTaskLocalStateStore(this.allocationBaseDirs, this.jobID, this.allocationID, this.jobVertexID, this.subtaskIdx);
    }

    @Nonnull
    private ChangelogTaskLocalStateStore createChangelogTaskLocalStateStore(File[] allocationBaseDirs, JobID jobID, AllocationID allocationID, JobVertexID jobVertexID, int subtaskIdx) {
        LocalSnapshotDirectoryProviderImpl directoryProvider = new LocalSnapshotDirectoryProviderImpl(allocationBaseDirs, jobID, jobVertexID, subtaskIdx);
        this.localSnapshotDirectoryProvider = directoryProvider;
        LocalRecoveryConfig localRecoveryConfig = LocalRecoveryConfig.backupAndRecoveryEnabled((LocalSnapshotDirectoryProvider)directoryProvider);
        return new ChangelogTaskLocalStateStore(jobID, allocationID, jobVertexID, subtaskIdx, localRecoveryConfig, Executors.directExecutor());
    }

    @Override
    @Test
    void pruneCheckpoints() throws Exception {
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot stateSnapshot1 = this.storeChangelogStates(1L, 1L);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot stateSnapshot2 = this.storeChangelogStates(2L, 1L);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot stateSnapshot3 = this.storeChangelogStates(3L, 1L);
        this.taskLocalStateStore.pruneMatchingCheckpoints(id -> id != 2L);
        Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState(3L)).isNull();
        Assertions.assertThat((boolean)stateSnapshot3.isDiscarded()).isTrue();
        Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState(1L)).isNull();
        Assertions.assertThat((boolean)stateSnapshot1.isDiscarded()).isTrue();
        Assertions.assertThat((boolean)this.checkMaterializedDirExists(1L)).isTrue();
        Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState(2L)).isEqualTo((Object)stateSnapshot2);
    }

    @Override
    @Test
    void confirmCheckpoint() throws Exception {
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot stateSnapshot1 = this.storeChangelogStates(1L, 1L);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot stateSnapshot2 = this.storeChangelogStates(2L, 1L);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot stateSnapshot3 = this.storeChangelogStates(3L, 1L);
        this.taskLocalStateStore.confirmCheckpoint(3L);
        Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState(2L)).isNull();
        Assertions.assertThat((boolean)stateSnapshot2.isDiscarded()).isTrue();
        Assertions.assertThat((boolean)stateSnapshot1.isDiscarded()).isTrue();
        Assertions.assertThat((boolean)this.checkMaterializedDirExists(1L)).isTrue();
        Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState(3L)).isEqualTo((Object)stateSnapshot3);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot stateSnapshot4 = this.storeChangelogStates(4L, 2L);
        this.taskLocalStateStore.confirmCheckpoint(4L);
        Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState(3L)).isNull();
        Assertions.assertThat((boolean)stateSnapshot3.isDiscarded()).isTrue();
        Assertions.assertThat((boolean)this.checkMaterializedDirExists(1L)).isFalse();
        Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState(4L)).isEqualTo((Object)stateSnapshot4);
    }

    @Override
    @Test
    void abortCheckpoint() throws Exception {
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot stateSnapshot1 = this.storeChangelogStates(1L, 1L);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot stateSnapshot2 = this.storeChangelogStates(2L, 2L);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot stateSnapshot3 = this.storeChangelogStates(3L, 2L);
        this.taskLocalStateStore.abortCheckpoint(2L);
        Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState(2L)).isNull();
        Assertions.assertThat((boolean)stateSnapshot2.isDiscarded()).isTrue();
        Assertions.assertThat((boolean)this.checkMaterializedDirExists(2L)).isTrue();
        Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState(1L)).isEqualTo((Object)stateSnapshot1);
        Assertions.assertThat((boolean)this.checkMaterializedDirExists(1L)).isTrue();
        Assertions.assertThat((Object)this.taskLocalStateStore.retrieveLocalState(3L)).isEqualTo((Object)stateSnapshot3);
        this.taskLocalStateStore.abortCheckpoint(3L);
        Assertions.assertThat((boolean)this.checkMaterializedDirExists(2L)).isFalse();
    }

    @Override
    @Test
    void retrievePersistedLocalStateFromDisc() {
        TaskStateSnapshot taskStateSnapshot = this.createTaskStateSnapshot();
        long checkpointId = 0L;
        this.taskLocalStateStore.storeLocalState(0L, taskStateSnapshot);
        ChangelogTaskLocalStateStore newTaskLocalStateStore = this.createChangelogTaskLocalStateStore(this.allocationBaseDirs, this.jobID, this.allocationID, this.jobVertexID, this.subtaskIdx);
        TaskStateSnapshot retrievedTaskStateSnapshot = newTaskLocalStateStore.retrieveLocalState(0L);
        Assertions.assertThat((Object)retrievedTaskStateSnapshot).isEqualTo((Object)taskStateSnapshot);
    }

    @Override
    @Test
    void deletesLocalStateIfRetrievalFails() throws IOException {
        TaskStateSnapshot taskStateSnapshot = this.createTaskStateSnapshot();
        long checkpointId = 0L;
        this.taskLocalStateStore.storeLocalState(0L, taskStateSnapshot);
        File taskStateSnapshotFile = this.taskLocalStateStore.getTaskStateSnapshotFile(0L);
        Files.write(taskStateSnapshotFile.toPath(), new byte[]{1, 2, 3, 4}, StandardOpenOption.WRITE);
        ChangelogTaskLocalStateStore newTaskLocalStateStore = this.createChangelogTaskLocalStateStore(this.allocationBaseDirs, this.jobID, this.allocationID, this.jobVertexID, this.subtaskIdx);
        Assertions.assertThat((Object)newTaskLocalStateStore.retrieveLocalState(0L)).isNull();
        Assertions.assertThat((File)taskStateSnapshotFile.getParentFile()).doesNotExist();
    }

    private boolean checkMaterializedDirExists(long materializationID) {
        File materializedDir = this.localSnapshotDirectoryProvider.subtaskSpecificCheckpointDirectory(materializationID);
        return materializedDir.exists();
    }

    private void writeToMaterializedDir(long materializationID) {
        File materializedDir = this.localSnapshotDirectoryProvider.subtaskSpecificCheckpointDirectory(materializationID);
        if (!materializedDir.exists() && !materializedDir.mkdirs()) {
            throw new FlinkRuntimeException(String.format("Could not create the materialized directory '%s'", materializedDir));
        }
    }

    private TaskLocalStateStoreImplTest.TestingTaskStateSnapshot storeChangelogStates(long checkpointID, long materializationID) {
        this.writeToMaterializedDir(materializationID);
        OperatorID operatorID = new OperatorID();
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot taskStateSnapshot = new TaskLocalStateStoreImplTest.TestingTaskStateSnapshot();
        OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.emptyList(), Collections.emptyList(), new KeyGroupRange(0, 3), checkpointID, materializationID, checkpointID)).build();
        taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
        this.taskLocalStateStore.storeLocalState(checkpointID, (TaskStateSnapshot)taskStateSnapshot);
        return taskStateSnapshot;
    }
}

