/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobClientTest;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobKeyTest;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobServerPutTest;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TestingBlobStore;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
import org.apache.flink.runtime.blob.TestingBlobUtils;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.Reference;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.TriFunctionWithException;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class BlobServerGetTest {
    private final Random rnd = new Random();
    @TempDir
    private Path tempDir;

    BlobServerGetTest() {
    }

    @Test
    void testGetTransientFailsDuringLookup1() throws IOException {
        this.testGetFailsDuringLookup(null, new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testGetTransientFailsDuringLookup2() throws IOException {
        this.testGetFailsDuringLookup(new JobID(), new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testGetTransientFailsDuringLookup3() throws IOException {
        this.testGetFailsDuringLookup(new JobID(), null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testGetPermanentFailsDuringLookup() throws IOException {
        this.testGetFailsDuringLookup(new JobID(), new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testGetFailsDuringLookup(@Nullable JobID jobId1, @Nullable JobID jobId2, BlobKey.BlobType blobType) throws IOException {
        try (BlobServer server = TestingBlobUtils.createServer(this.tempDir);){
            server.start();
            byte[] data = new byte[2000000];
            this.rnd.nextBytes(data);
            BlobKey key = BlobServerPutTest.put((BlobService)server, jobId1, data, blobType);
            Assertions.assertThat((Comparable)key).isNotNull();
            File blobFile = server.getStorageLocation(jobId1, key);
            Assertions.assertThat((boolean)blobFile.delete()).isTrue();
            BlobServerGetTest.verifyDeleted((BlobService)server, jobId1, key);
            BlobKey key2 = BlobServerPutTest.put((BlobService)server, jobId2, data, blobType);
            Assertions.assertThat((Comparable)key2).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals(key, key2);
            BlobServerGetTest.get((BlobService)server, jobId2, key2);
            BlobServerGetTest.verifyDeleted((BlobService)server, jobId1, key);
            blobFile = server.getStorageLocation(jobId2, key2);
            Assertions.assertThat((boolean)blobFile.delete()).isTrue();
            BlobServerGetTest.verifyDeleted((BlobService)server, jobId2, key2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetFailsIncomingForJobHa() throws IOException {
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        JobID jobId = new JobID();
        Configuration config = new Configuration();
        config.set(HighAvailabilityOptions.HA_MODE, (Object)"ZOOKEEPER");
        config.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)TempDirUtils.newFolder((Path)this.tempDir).getPath());
        BlobStoreService blobStore = null;
        try {
            blobStore = BlobUtils.createBlobStoreFromConfig((Configuration)config);
            File tempFileDir = null;
            try (BlobServer server = TestingBlobUtils.createServer(this.tempDir, config, (BlobStore)blobStore);){
                server.start();
                byte[] data = new byte[2000000];
                this.rnd.nextBytes(data);
                BlobKey blobKey = BlobServerPutTest.put((BlobService)server, jobId, data, BlobKey.BlobType.PERMANENT_BLOB);
                Assertions.assertThat((boolean)server.getStorageLocation(jobId, blobKey).delete()).isTrue();
                tempFileDir = server.createTemporaryFilename().getParentFile();
                Assertions.assertThat((boolean)tempFileDir.setExecutable(true, false)).isTrue();
                Assertions.assertThat((boolean)tempFileDir.setReadable(true, false)).isTrue();
                Assertions.assertThat((boolean)tempFileDir.setWritable(false, false)).isTrue();
                try {
                    Assertions.assertThatThrownBy(() -> BlobServerGetTest.get((BlobService)server, jobId, blobKey)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(IOException.class, (String)"Permission denied")});
                }
                finally {
                    HashSet<Object> expectedDirs = new HashSet<Object>();
                    expectedDirs.add("incoming");
                    expectedDirs.add("job_" + String.valueOf(jobId));
                    File storageDir = tempFileDir.getParentFile();
                    Object[] actualDirs = storageDir.list();
                    Assertions.assertThat((Object[])actualDirs).isNotNull();
                    Assertions.assertThat((Object[])actualDirs).isNotEmpty();
                    Assertions.assertThat(new HashSet<Object>(Arrays.asList(actualDirs))).isEqualTo(expectedDirs);
                    File jobDir = new File(tempFileDir.getParentFile(), "job_" + String.valueOf(jobId));
                    Assertions.assertThat((Object[])jobDir.list()).isEmpty();
                }
            }
            finally {
                if (tempFileDir != null) {
                    tempFileDir.setWritable(true, false);
                }
            }
        }
        finally {
            if (blobStore != null) {
                blobStore.cleanupAllData();
                blobStore.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetFailsStoreForJobHa() throws IOException {
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        JobID jobId = new JobID();
        Configuration config = new Configuration();
        config.set(HighAvailabilityOptions.HA_MODE, (Object)"ZOOKEEPER");
        config.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)TempDirUtils.newFolder((Path)this.tempDir).getPath());
        BlobStoreService blobStore = null;
        try {
            blobStore = BlobUtils.createBlobStoreFromConfig((Configuration)config);
            File jobStoreDir = null;
            try (BlobServer server = TestingBlobUtils.createServer(this.tempDir, config, (BlobStore)blobStore);){
                server.start();
                byte[] data = new byte[2000000];
                this.rnd.nextBytes(data);
                BlobKey blobKey = BlobServerPutTest.put((BlobService)server, jobId, data, BlobKey.BlobType.PERMANENT_BLOB);
                Assertions.assertThat((boolean)server.getStorageLocation(jobId, blobKey).delete()).isTrue();
                jobStoreDir = server.getStorageLocation(jobId, blobKey).getParentFile();
                Assertions.assertThat((boolean)jobStoreDir.setExecutable(true, false)).isTrue();
                Assertions.assertThat((boolean)jobStoreDir.setReadable(true, false)).isTrue();
                Assertions.assertThat((boolean)jobStoreDir.setWritable(false, false)).isTrue();
                try {
                    Assertions.assertThatThrownBy(() -> BlobServerGetTest.get((BlobService)server, jobId, blobKey)).isInstanceOf(AccessDeniedException.class);
                }
                finally {
                    File incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
                    Assertions.assertThat((Object[])incomingFileDir.list()).isEmpty();
                    Assertions.assertThat((Object[])jobStoreDir.list()).isEmpty();
                }
            }
            finally {
                if (jobStoreDir != null) {
                    jobStoreDir.setWritable(true, false);
                }
            }
        }
        finally {
            if (blobStore != null) {
                blobStore.cleanupAllData();
                blobStore.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testGetFailsHaStoreForJobHa() throws IOException {
        JobID jobId = new JobID();
        try (BlobServer server = TestingBlobUtils.createServer(this.tempDir);){
            server.start();
            byte[] data = new byte[2000000];
            this.rnd.nextBytes(data);
            BlobKey blobKey = BlobServerPutTest.put((BlobService)server, jobId, data, BlobKey.BlobType.PERMANENT_BLOB);
            Assertions.assertThat((boolean)server.getStorageLocation(jobId, blobKey).delete()).isTrue();
            File tempFileDir = server.createTemporaryFilename().getParentFile();
            try {
                Assertions.assertThatThrownBy(() -> BlobServerGetTest.get((BlobService)server, jobId, blobKey)).isInstanceOf(NoSuchFileException.class);
            }
            finally {
                HashSet<Object> expectedDirs = new HashSet<Object>();
                expectedDirs.add("incoming");
                expectedDirs.add("job_" + String.valueOf(jobId));
                File storageDir = tempFileDir.getParentFile();
                Object[] actualDirs = storageDir.list();
                Assertions.assertThat((Object[])actualDirs).isNotNull();
                Assertions.assertThat((Object[])actualDirs).isNotEmpty();
                Assertions.assertThat(new HashSet<Object>(Arrays.asList(actualDirs))).isEqualTo(expectedDirs);
                File jobDir = new File(tempFileDir.getParentFile(), "job_" + String.valueOf(jobId));
                Assertions.assertThat((Object[])jobDir.list()).isEmpty();
            }
        }
    }

    @Test
    void testConcurrentGetOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
        this.testConcurrentGetOperations(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testConcurrentGetOperationsForJob() throws IOException, ExecutionException, InterruptedException {
        this.testConcurrentGetOperations(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testConcurrentGetOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
        this.testConcurrentGetOperations(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    @Test
    void testGetChecksForCorruptionInPermanentBlobInCaseOfRestart() throws IOException {
        this.runGetChecksForCorruptionInCaseOfRestartTest(BlobKey.BlobType.PERMANENT_BLOB);
    }

    @Test
    void testGetChecksForCorruptionInTransientBlobInCaseOfRestart() throws IOException {
        this.runGetChecksForCorruptionInCaseOfRestartTest(BlobKey.BlobType.TRANSIENT_BLOB);
    }

    private void runGetChecksForCorruptionInCaseOfRestartTest(BlobKey.BlobType blobType) throws IOException {
        JobID jobId = JobID.generate();
        byte[] data = new byte[]{1, 2, 3};
        byte[] corruptedData = new byte[]{3, 2, 1};
        File storageDir = TempDirUtils.newFolder((Path)this.tempDir);
        try (BlobServer blobServer = new BlobServer(new Configuration(), Reference.borrowed((Object)storageDir), (BlobStore)new VoidBlobStore());){
            BlobKey blobKey = BlobServerPutTest.put((BlobService)blobServer, jobId, data, blobType);
            blobServer.close();
            File blob = blobServer.getStorageLocation(jobId, blobKey);
            FileUtils.writeByteArrayToFile((File)blob, (byte[])corruptedData);
            try (BlobServer restartedBlobServer = new BlobServer(new Configuration(), Reference.borrowed((Object)storageDir), (BlobStore)new VoidBlobStore());){
                Assertions.assertThatThrownBy(() -> BlobServerGetTest.get((BlobService)restartedBlobServer, jobId, blobKey)).isInstanceOf(IOException.class);
            }
        }
    }

    @Test
    void testGetReDownloadsCorruptedPermanentBlobFromBlobStoreInCaseOfRestart() throws IOException {
        JobID jobId = JobID.generate();
        byte[] data = new byte[]{1, 2, 3};
        byte[] corruptedData = new byte[]{3, 2, 1};
        File storageDir = TempDirUtils.newFolder((Path)this.tempDir);
        OneShotLatch getCalled = new OneShotLatch();
        TestingBlobStore blobStore = new TestingBlobStoreBuilder().setGetFunction((TriFunctionWithException<JobID, BlobKey, File, Boolean, IOException>)((TriFunctionWithException)(jobID, blobKey, file) -> {
            getCalled.trigger();
            FileUtils.writeByteArrayToFile((File)file, (byte[])data);
            return true;
        })).createTestingBlobStore();
        try (BlobServer blobServer = new BlobServer(new Configuration(), Reference.borrowed((Object)storageDir), (BlobStore)blobStore);){
            BlobKey blobKey2 = BlobServerPutTest.put((BlobService)blobServer, jobId, data, BlobKey.BlobType.PERMANENT_BLOB);
            blobServer.close();
            File blob = blobServer.getStorageLocation(jobId, blobKey2);
            FileUtils.writeByteArrayToFile((File)blob, (byte[])corruptedData);
            try (BlobServer restartedBlobServer = new BlobServer(new Configuration(), Reference.borrowed((Object)storageDir), (BlobStore)blobStore);){
                File file2 = BlobServerGetTest.get((BlobService)restartedBlobServer, jobId, blobKey2);
                BlobClientTest.validateGetAndClose(Files.newInputStream(file2.toPath(), new OpenOption[0]), data);
                Assertions.assertThat((boolean)getCalled.isTriggered()).isTrue();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentGetOperations(@Nullable JobID jobId, BlobKey.BlobType blobType) throws IOException, InterruptedException, ExecutionException {
        byte[] data = new byte[]{1, 2, 3, 4, 99, 42};
        TestingBlobStore blobStore = new TestingBlobStoreBuilder().setGetFunction((TriFunctionWithException<JobID, BlobKey, File, Boolean, IOException>)((TriFunctionWithException)(jobID, blobKey, file) -> {
            FileUtils.writeByteArrayToFile((File)file, (byte[])data);
            return true;
        })).createTestingBlobStore();
        int numberConcurrentGetOperations = 3;
        ArrayList<CompletableFuture<File>> getOperations = new ArrayList<CompletableFuture<File>>(3);
        ExecutorService executor = Executors.newFixedThreadPool(3);
        try (BlobServer server = TestingBlobUtils.createServer(this.tempDir, new Configuration(), blobStore);){
            server.start();
            BlobKey blobKey2 = BlobServerPutTest.put((BlobService)server, jobId, data, blobType);
            if (blobType == BlobKey.BlobType.PERMANENT_BLOB) {
                Assertions.assertThat((boolean)server.getStorageLocation(jobId, blobKey2).delete()).isTrue();
            }
            for (int i = 0; i < 3; ++i) {
                CompletableFuture<File> getOperation = CompletableFuture.supplyAsync(() -> {
                    try {
                        File file = BlobServerGetTest.get((BlobService)server, jobId, blobKey2);
                        BlobClientTest.validateGetAndClose(Files.newInputStream(file.toPath(), new OpenOption[0]), data);
                        return file;
                    }
                    catch (IOException e) {
                        throw new CompletionException((Throwable)new FlinkException("Could not read blob for key " + String.valueOf(blobKey2) + ".", (Throwable)e));
                    }
                }, executor);
                getOperations.add(getOperation);
            }
            FutureUtils.ConjunctFuture filesFuture = FutureUtils.combineAll(getOperations);
            filesFuture.get();
        }
        finally {
            executor.shutdownNow();
        }
    }

    static File get(BlobService service, @Nullable JobID jobId, BlobKey key) throws IOException {
        if (key instanceof PermanentBlobKey) {
            return service.getPermanentBlobService().getFile(jobId, (PermanentBlobKey)key);
        }
        if (jobId == null) {
            return service.getTransientBlobService().getFile((TransientBlobKey)key);
        }
        return service.getTransientBlobService().getFile(jobId, (TransientBlobKey)key);
    }

    static void verifyDeleted(BlobService service, @Nullable JobID jobId, BlobKey key) {
        Assertions.assertThatThrownBy(() -> BlobServerGetTest.get(service, jobId, key)).isInstanceOf(IOException.class);
    }
}

