/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidPermanentBlobService;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.testutils.WorkingDirectoryExtension;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

class TaskExecutorLocalStateStoresManagerTest {
    @TempDir
    public static File temporaryFolder;
    @RegisterExtension
    public static final AllCallbackWrapper<WorkingDirectoryExtension> WORKING_DIRECTORY_EXTENSION_WRAPPER;

    TaskExecutorLocalStateStoresManagerTest() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testCreationFromConfig() throws Exception {
        Configuration config = new Configuration();
        File newFolder = TempDirUtils.newFolder((Path)temporaryFolder.toPath());
        String tmpDir = newFolder.getAbsolutePath() + File.separator;
        String rootDirString = "__localStateRoot1,__localStateRoot2,__localStateRoot3".replaceAll("__", tmpDir);
        config.set(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, (Object)rootDirString);
        config.set(CheckpointingOptions.LOCAL_RECOVERY, (Object)true);
        WorkingDirectory workingDirectory = ((WorkingDirectoryExtension)WORKING_DIRECTORY_EXTENSION_WRAPPER.getCustomExtension()).createNewWorkingDirectory();
        TaskManagerServices taskManagerServices = this.createTaskManagerServices(this.createTaskManagerServiceConfiguration(config, workingDirectory), workingDirectory);
        try {
            TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
            String[] split = rootDirString.split(",");
            File[] rootDirectories = taskStateManager.getLocalStateRootDirectories();
            for (int i = 0; i < split.length; ++i) {
                Assertions.assertThat((Path)rootDirectories[i].toPath()).startsWith(Paths.get(split[i], new String[0]));
            }
            Assertions.assertThat((boolean)taskStateManager.isLocalRecoveryEnabled()).isTrue();
            for (File rootDirectory : rootDirectories) {
                FileUtils.deleteFileOrDirectory((File)rootDirectory);
            }
        }
        finally {
            taskManagerServices.shutDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testCreationFromConfigDefault() throws Exception {
        Configuration config = new Configuration();
        WorkingDirectory workingDirectory = ((WorkingDirectoryExtension)WORKING_DIRECTORY_EXTENSION_WRAPPER.getCustomExtension()).createNewWorkingDirectory();
        TaskManagerServicesConfiguration taskManagerServicesConfiguration = this.createTaskManagerServiceConfiguration(config, workingDirectory);
        TaskManagerServices taskManagerServices = this.createTaskManagerServices(taskManagerServicesConfiguration, workingDirectory);
        try {
            TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
            File[] localStateRootDirectories = taskStateManager.getLocalStateRootDirectories();
            for (int i = 0; i < localStateRootDirectories.length; ++i) {
                Assertions.assertThat((File)localStateRootDirectories[i]).isEqualTo((Object)workingDirectory.getLocalStateDirectory());
            }
            Assertions.assertThat((boolean)taskStateManager.isLocalRecoveryEnabled()).isFalse();
        }
        finally {
            taskManagerServices.shutDown();
        }
    }

    @Test
    void testLocalStateNoCreateDirWhenDisabledLocalRecovery() throws Exception {
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        AllocationID allocationID = new AllocationID();
        int subtaskIdx = 23;
        File[] rootDirs = new File[]{TempDirUtils.newFolder((Path)temporaryFolder.toPath()), TempDirUtils.newFolder((Path)temporaryFolder.toPath()), TempDirUtils.newFolder((Path)temporaryFolder.toPath())};
        boolean localRecoveryEnabled = false;
        TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager(localRecoveryEnabled, Reference.owned((Object)rootDirs), Executors.directExecutor());
        TaskLocalStateStore taskLocalStateStore = storesManager.localStateStoreForSubtask(jobID, allocationID, jobVertexID, subtaskIdx, new Configuration(), new Configuration());
        Assertions.assertThat((boolean)taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled()).isFalse();
        Assertions.assertThat((Optional)taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider()).isNotPresent();
        for (File recoveryDir : rootDirs) {
            Assertions.assertThat((File)recoveryDir).isEmptyDirectory();
        }
    }

    @Test
    void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception {
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        AllocationID allocationID = new AllocationID();
        int subtaskIdx = 23;
        File[] rootDirs = new File[]{TempDirUtils.newFolder((Path)temporaryFolder.toPath()), TempDirUtils.newFolder((Path)temporaryFolder.toPath()), TempDirUtils.newFolder((Path)temporaryFolder.toPath())};
        TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager(true, Reference.owned((Object)rootDirs), Executors.directExecutor());
        TaskLocalStateStore taskLocalStateStore = storesManager.localStateStoreForSubtask(jobID, allocationID, jobVertexID, subtaskIdx, new Configuration(), new Configuration());
        LocalRecoveryDirectoryProvider directoryProvider = (LocalRecoveryDirectoryProvider)taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
        for (int i = 0; i < 10; ++i) {
            Assertions.assertThat((File)new File(rootDirs[(i & Integer.MAX_VALUE) % rootDirs.length], storesManager.allocationSubDirString(allocationID))).isEqualTo((Object)directoryProvider.allocationBaseDirectory((long)i));
        }
        long chkId = 42L;
        File allocBaseDirChk42 = directoryProvider.allocationBaseDirectory(chkId);
        File subtaskSpecificCheckpointDirectory = directoryProvider.subtaskSpecificCheckpointDirectory(chkId);
        Assertions.assertThat((File)new File(allocBaseDirChk42, "jid_" + jobID + File.separator + "vtx_" + jobVertexID + "_sti_" + subtaskIdx + File.separator + "chk_" + chkId)).isEqualTo((Object)subtaskSpecificCheckpointDirectory);
        Assertions.assertThat((boolean)subtaskSpecificCheckpointDirectory.mkdirs()).isTrue();
        File testFile = new File(subtaskSpecificCheckpointDirectory, "test");
        Assertions.assertThat((boolean)testFile.createNewFile()).isTrue();
        Assertions.assertThat((boolean)taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled()).isEqualTo(storesManager.isLocalRecoveryEnabled());
        Assertions.assertThat((File)testFile).exists();
        storesManager.releaseLocalStateForAllocationId(allocationID);
        this.checkRootDirsClean(rootDirs);
        AllocationID otherAllocationID = new AllocationID();
        taskLocalStateStore = storesManager.localStateStoreForSubtask(jobID, otherAllocationID, jobVertexID, subtaskIdx, new Configuration(), new Configuration());
        directoryProvider = (LocalRecoveryDirectoryProvider)taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
        File chkDir = directoryProvider.subtaskSpecificCheckpointDirectory(23L);
        Assertions.assertThat((boolean)chkDir.mkdirs()).isTrue();
        testFile = new File(chkDir, "test");
        Assertions.assertThat((boolean)testFile.createNewFile()).isTrue();
        storesManager.shutdown();
        this.checkRootDirsClean(rootDirs);
    }

    @Test
    void testOwnedLocalStateDirectoriesAreDeletedOnShutdown() throws IOException {
        File localStateStoreA = TempDirUtils.newFolder((Path)temporaryFolder.toPath());
        File localStateStoreB = TempDirUtils.newFolder((Path)temporaryFolder.toPath());
        File[] localStateDirectories = new File[]{localStateStoreA, localStateStoreB};
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.owned((Object)localStateDirectories), Executors.directExecutor());
        for (File localStateDirectory : localStateDirectories) {
            Assertions.assertThat((File)localStateDirectory).exists();
        }
        taskExecutorLocalStateStoresManager.shutdown();
        for (File localStateDirectory : localStateDirectories) {
            Assertions.assertThat((File)localStateDirectory).doesNotExist();
        }
    }

    @Test
    void testBorrowedLocalStateDirectoriesAreNotDeletedOnShutdown() throws IOException {
        File localStateStoreA = TempDirUtils.newFolder((Path)temporaryFolder.toPath());
        File localStateStoreB = TempDirUtils.newFolder((Path)temporaryFolder.toPath());
        File[] localStateDirectories = new File[]{localStateStoreA, localStateStoreB};
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.borrowed((Object)localStateDirectories), Executors.directExecutor());
        for (File localStateDirectory : localStateDirectories) {
            Assertions.assertThat((File)localStateDirectory).exists();
        }
        taskExecutorLocalStateStoresManager.shutdown();
        for (File localStateDirectory : localStateDirectories) {
            Assertions.assertThat((File)localStateDirectory).exists();
        }
    }

    @Test
    void testRetainLocalStateForAllocationsDeletesUnretainedAllocationDirectories() throws IOException {
        File localStateStore = TempDirUtils.newFolder((Path)temporaryFolder.toPath());
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.owned((Object)new File[]{localStateStore}), Executors.directExecutor());
        JobID jobId = new JobID();
        AllocationID retainedAllocationId = new AllocationID();
        AllocationID otherAllocationId = new AllocationID();
        JobVertexID jobVertexId = new JobVertexID();
        taskExecutorLocalStateStoresManager.localStateStoreForSubtask(jobId, retainedAllocationId, jobVertexId, 0, new Configuration(), new Configuration());
        taskExecutorLocalStateStoresManager.localStateStoreForSubtask(jobId, otherAllocationId, jobVertexId, 1, new Configuration(), new Configuration());
        Collection allocationDirectories = TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn((File)localStateStore);
        Assertions.assertThat((Collection)allocationDirectories).hasSize(2);
        taskExecutorLocalStateStoresManager.retainLocalStateForAllocations((Set)Sets.newHashSet((Object[])new AllocationID[]{retainedAllocationId}));
        Collection allocationDirectoriesAfterCleanup = TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn((File)localStateStore);
        Assertions.assertThat((Collection)allocationDirectoriesAfterCleanup).hasSize(1);
        Assertions.assertThat((File)new File(localStateStore, taskExecutorLocalStateStoresManager.allocationSubDirString(otherAllocationId))).doesNotExist();
    }

    private void checkRootDirsClean(File[] rootDirs) {
        for (File rootDir : rootDirs) {
            Object[] files = rootDir.listFiles();
            if (files == null) continue;
            Assertions.assertThat((Object[])files).isEmpty();
        }
    }

    private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration(Configuration config, WorkingDirectory workingDirectory) throws Exception {
        return TaskManagerServicesConfiguration.fromConfiguration((Configuration)config, (ResourceID)ResourceID.generate(), (String)InetAddress.getLocalHost().getHostName(), (boolean)true, (TaskExecutorResourceSpec)TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution((Configuration)config), (WorkingDirectory)workingDirectory);
    }

    private TaskManagerServices createTaskManagerServices(TaskManagerServicesConfiguration config, WorkingDirectory workingDirectory) throws Exception {
        return TaskManagerServices.fromConfiguration((TaskManagerServicesConfiguration)config, (PermanentBlobService)VoidPermanentBlobService.INSTANCE, (MetricGroup)UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), (ExecutorService)Executors.newDirectExecutorService(), null, throwable -> {}, (WorkingDirectory)workingDirectory);
    }

    static {
        WORKING_DIRECTORY_EXTENSION_WRAPPER = new AllCallbackWrapper((CustomExtension)new WorkingDirectoryExtension(() -> temporaryFolder));
    }
}

