/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk;

import java.io.File;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.testutils.TestJvmProcess;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.ShutdownHookUtil;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FileChannelManagerImplTest {
    private static final Logger LOG = LoggerFactory.getLogger(FileChannelManagerImplTest.class);
    private static final String DIR_NAME_PREFIX = "manager-test";
    private static final String SIGNAL_FILE_FOR_KILLING = "could-kill";
    private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10L);
    @TempDir
    private Path temporaryFolder;

    FileChannelManagerImplTest() {
    }

    @Test
    void testFairness() throws Exception {
        String directory1 = TempDirUtils.newFolder((Path)this.temporaryFolder).getAbsoluteFile().getAbsolutePath();
        String directory2 = TempDirUtils.newFolder((Path)this.temporaryFolder).getAbsoluteFile().getAbsolutePath();
        try (FileChannelManagerImpl fileChannelManager = new FileChannelManagerImpl(new String[]{directory1, directory2}, "test");){
            int i;
            int numChannelIDs = 100000;
            AtomicInteger counter1 = new AtomicInteger();
            AtomicInteger counter2 = new AtomicInteger();
            int numThreads = 10;
            Thread[] threads = new Thread[numThreads];
            for (i = 0; i < numThreads; ++i) {
                threads[i] = new Thread(() -> FileChannelManagerImplTest.lambda$testFairness$0(numChannelIDs, (FileChannelManager)fileChannelManager, directory1, counter1, counter2));
                threads[i].start();
            }
            for (i = 0; i < numThreads; ++i) {
                threads[i].join();
            }
            Assertions.assertThat((AtomicInteger)counter2).hasValue(counter1.get());
        }
    }

    @Test
    void testDirectoriesCleanupOnKillWithoutCallerHook() throws Exception {
        this.testDirectoriesCleanupOnKill(false);
    }

    @Test
    void testDirectoriesCleanupOnKillWithCallerHook() throws Exception {
        this.testDirectoriesCleanupOnKill(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testDirectoriesCleanupOnKill(boolean callerHasHook) throws Exception {
        Assumptions.assumeThat((OperatingSystem.isLinux() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris() || OperatingSystem.isMac() ? 1 : 0) != 0).isTrue();
        File fileChannelDir = TempDirUtils.newFolder((Path)this.temporaryFolder);
        File signalDir = TempDirUtils.newFolder((Path)this.temporaryFolder);
        File signalFile = new File(signalDir.getAbsolutePath(), SIGNAL_FILE_FOR_KILLING);
        FileChannelManagerTestProcess fileChannelManagerTestProcess = new FileChannelManagerTestProcess(callerHasHook, fileChannelDir.getAbsolutePath(), signalFile.getAbsolutePath());
        try {
            fileChannelManagerTestProcess.startProcess();
            TestJvmProcess.waitForMarkerFile(signalFile, 3L * TEST_TIMEOUT.toMillis());
            Process kill = Runtime.getRuntime().exec("kill " + fileChannelManagerTestProcess.getProcessId());
            kill.waitFor();
            ((AbstractIntegerAssert)Assertions.assertThat((int)kill.exitValue()).withFailMessage("Failed to send SIG_TERM to process", new Object[0])).isZero();
            Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
            while (fileChannelManagerTestProcess.isAlive() && deadline.hasTimeLeft()) {
                Thread.sleep(100L);
            }
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)fileChannelManagerTestProcess.isAlive()).withFailMessage("The file channel manager test process does not terminate in time, its output is: \n%s", new Object[]{fileChannelManagerTestProcess.getProcessOutput()})).isFalse();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.fileOrDirExists(fileChannelDir, DIR_NAME_PREFIX)).withFailMessage("The file channel manager test process does not remove the tmp shuffle directories after termination, its output is \n%s", new Object[]{fileChannelManagerTestProcess.getProcessOutput()})).isFalse();
        }
        finally {
            fileChannelManagerTestProcess.destroy();
        }
    }

    private boolean fileOrDirExists(File rootTmpDir, String namePattern) {
        File[] candidates = rootTmpDir.listFiles((dir, name) -> name.contains(namePattern));
        return candidates != null && candidates.length > 0;
    }

    private static /* synthetic */ void lambda$testFairness$0(int numChannelIDs, FileChannelManager fileChannelManager, String directory1, AtomicInteger counter1, AtomicInteger counter2) {
        for (int j = 0; j < numChannelIDs; ++j) {
            FileIOChannel.ID channelID = fileChannelManager.createChannel();
            if (channelID.getPath().startsWith(directory1)) {
                counter1.incrementAndGet();
                continue;
            }
            counter2.incrementAndGet();
        }
    }

    private static class FileChannelManagerCleanupRunner {
        private FileChannelManagerCleanupRunner() {
        }

        public static void main(String[] args) throws Exception {
            boolean callerHasHook = Boolean.parseBoolean(args[0]);
            String tmpDirectory = args[1];
            String signalFilePath = args[2];
            LOG.info("The FileChannelManagerCleanupRunner process has started");
            FileChannelManagerImpl manager = new FileChannelManagerImpl(new String[]{tmpDirectory}, FileChannelManagerImplTest.DIR_NAME_PREFIX);
            if (callerHasHook) {
                ShutdownHookUtil.addShutdownHook(() -> FileChannelManagerCleanupRunner.lambda$main$0((FileChannelManager)manager), (String)"Caller", (Logger)LOG);
            }
            LOG.info("The FileChannelManagerCleanupRunner is going to create the new file");
            new File(signalFilePath).createNewFile();
            LOG.info("The FileChannelManagerCleanupRunner has created the new file");
            Thread.sleep(3L * TEST_TIMEOUT.toMillis());
        }

        private static /* synthetic */ void lambda$main$0(FileChannelManager manager) throws Exception {
            manager.close();
        }
    }

    private static class FileChannelManagerTestProcess
    extends TestJvmProcess {
        private final boolean callerHasHook;
        private final String tmpDirectories;
        private final String signalFilePath;

        FileChannelManagerTestProcess(boolean callerHasHook, String tmpDirectories, String signalFilePath) throws Exception {
            this.callerHasHook = callerHasHook;
            this.tmpDirectories = tmpDirectories;
            this.signalFilePath = signalFilePath;
        }

        @Override
        public String getName() {
            return "File Channel Manager Test";
        }

        @Override
        public String[] getMainMethodArgs() {
            return new String[]{Boolean.toString(this.callerHasHook), this.tmpDirectories, this.signalFilePath};
        }

        @Override
        public String getEntryPointClassName() {
            return FileChannelManagerCleanupRunner.class.getName();
        }
    }
}

