/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.blob.BlobCachePutTest;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClientTest;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobKeyTest;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobServerDeleteTest;
import org.apache.flink.runtime.blob.BlobServerGetTest;
import org.apache.flink.runtime.blob.BlobServerPutTest;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TestingBlobUtils;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.assertj.core.api.ObjectArrayAssert;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class BlobCacheGetTest {
    @TempDir
    private Path tempDir;
    private final Random rnd = new Random();

    BlobCacheGetTest() {
    }

    @Test
    void testGetTransientFailsDuringLookup1() throws IOException, InterruptedException {
        this.testGetFailsDuringLookup(null, new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testGetTransientFailsDuringLookup2() throws IOException, InterruptedException {
        this.testGetFailsDuringLookup(new JobID(), new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testGetTransientFailsDuringLookup3() throws IOException, InterruptedException {
        this.testGetFailsDuringLookup(new JobID(), null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testGetFailsDuringLookupHa() throws IOException, InterruptedException {
        this.testGetFailsDuringLookup(new JobID(), new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testGetFailsDuringLookup(JobID jobId1, JobID jobId2, BlobKey.BlobType blobType) throws IOException, InterruptedException {
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            server.start();
            byte[] data = new byte[2000000];
            this.rnd.nextBytes(data);
            BlobKey key = BlobServerPutTest.put((BlobService)server, jobId1, data, blobType);
            Assertions.assertThat((Comparable)key).isNotNull();
            BlobKeyTest.verifyType(blobType, key);
            File blobFile = server.getStorageLocation(jobId1, key);
            Assertions.assertThat((boolean)blobFile.delete()).isTrue();
            BlobServerGetTest.verifyDeleted((BlobService)cache, jobId1, key);
            BlobKey key2 = BlobServerPutTest.put((BlobService)server, jobId2, data, blobType);
            Assertions.assertThat((Comparable)key2).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals(key, key2);
            BlobServerGetTest.get((BlobService)cache, jobId2, key2);
            BlobServerGetTest.verifyDeleted((BlobService)cache, jobId1, key);
            if (blobType == BlobKey.BlobType.PERMANENT_BLOB) {
                Assertions.assertThat((File)server.getStorageLocation(jobId2, key2)).exists();
                blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key2);
                Assertions.assertThat((boolean)blobFile.delete()).isTrue();
                BlobServerGetTest.get((BlobService)cache, jobId2, key2);
                blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key2);
                Assertions.assertThat((boolean)blobFile.delete()).isTrue();
                blobFile = server.getStorageLocation(jobId2, key2);
                Assertions.assertThat((boolean)blobFile.delete()).isTrue();
                BlobServerGetTest.verifyDeleted((BlobService)cache, jobId2, key2);
            } else {
                BlobCachePutTest.verifyDeletedEventually(server, jobId2, key2);
                blobFile = cache.getTransientBlobService().getStorageLocation(jobId2, key2);
                Assertions.assertThat((boolean)blobFile.delete()).isTrue();
                BlobServerGetTest.verifyDeleted((BlobService)cache, jobId2, key2);
            }
        }
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetFailsIncomingNoJob() throws IOException {
        this.testGetFailsIncoming(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetFailsIncomingForJob() throws IOException {
        this.testGetFailsIncoming(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetFailsIncomingForJobHa() throws IOException {
        this.testGetFailsIncoming(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testGetFailsIncoming(@Nullable JobID jobId, BlobKey.BlobType blobType) throws IOException {
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        File tempFileDir = null;
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            server.start();
            byte[] data = new byte[2000000];
            this.rnd.nextBytes(data);
            BlobKey blobKey = BlobServerPutTest.put((BlobService)server, jobId, data, blobType);
            BlobKeyTest.verifyType(blobType, blobKey);
            tempFileDir = blobType == BlobKey.BlobType.PERMANENT_BLOB ? cache.getPermanentBlobService().createTemporaryFilename().getParentFile() : cache.getTransientBlobService().createTemporaryFilename().getParentFile();
            Assertions.assertThat((boolean)tempFileDir.setExecutable(true, false)).isTrue();
            Assertions.assertThat((boolean)tempFileDir.setReadable(true, false)).isTrue();
            Assertions.assertThat((boolean)tempFileDir.setWritable(false, false)).isTrue();
            try {
                ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> BlobServerGetTest.get((BlobService)cache, jobId, blobKey)).isInstanceOf(IOException.class)).hasMessageStartingWith("Failed to fetch BLOB");
            }
            finally {
                HashSet<String> expectedDirs = new HashSet<String>();
                expectedDirs.add("incoming");
                if (jobId != null) {
                    expectedDirs.add("job_" + jobId);
                    File jobDir = new File(tempFileDir.getParentFile(), "job_" + jobId);
                    Assertions.assertThat((Object[])jobDir.list()).isEmpty();
                } else {
                    expectedDirs.add("no_job");
                    File noJobDir = new File(tempFileDir.getParentFile(), "no_job");
                    Assertions.assertThat((Object[])noJobDir.list()).isEmpty();
                }
                File storageDir = tempFileDir.getParentFile();
                Object[] actualDirs = storageDir.list();
                ((ObjectArrayAssert)((ObjectArrayAssert)Assertions.assertThat((Object[])actualDirs).isNotNull()).isNotEmpty()).containsExactlyInAnyOrderElementsOf(expectedDirs);
                Assertions.assertThat((File)server.getStorageLocation(jobId, blobKey)).exists();
            }
        }
        finally {
            if (tempFileDir != null) {
                tempFileDir.setWritable(true, false);
            }
        }
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetTransientFailsStoreNoJob() throws IOException, InterruptedException {
        this.testGetFailsStore(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetTransientFailsStoreForJob() throws IOException, InterruptedException {
        this.testGetFailsStore(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetPermanentFailsStoreForJob() throws IOException, InterruptedException {
        this.testGetFailsStore(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testGetFailsStore(@Nullable JobID jobId, BlobKey.BlobType blobType) throws IOException, InterruptedException {
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        File jobStoreDir = null;
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            File incomingFileDir;
            server.start();
            byte[] data = new byte[2000000];
            this.rnd.nextBytes(data);
            BlobKey blobKey = BlobServerPutTest.put((BlobService)server, jobId, data, blobType);
            BlobKeyTest.verifyType(blobType, blobKey);
            jobStoreDir = blobType == BlobKey.BlobType.PERMANENT_BLOB ? cache.getPermanentBlobService().getStorageLocation(jobId, (BlobKey)new PermanentBlobKey()).getParentFile() : cache.getTransientBlobService().getStorageLocation(jobId, (BlobKey)new TransientBlobKey()).getParentFile();
            Assertions.assertThat((boolean)jobStoreDir.setExecutable(true, false)).isTrue();
            Assertions.assertThat((boolean)jobStoreDir.setReadable(true, false)).isTrue();
            Assertions.assertThat((boolean)jobStoreDir.setWritable(false, false)).isTrue();
            try {
                Assertions.assertThatThrownBy(() -> BlobServerGetTest.get((BlobService)cache, jobId, blobKey)).isInstanceOf(AccessDeniedException.class);
                incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
            }
            catch (Throwable throwable) {
                File incomingFileDir2 = new File(jobStoreDir.getParent(), "incoming");
                Assertions.assertThat((Object[])incomingFileDir2.list()).isEmpty();
                Assertions.assertThat((Object[])jobStoreDir.list()).isEmpty();
                if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                    BlobCachePutTest.verifyDeletedEventually(server, jobId, blobKey);
                } else {
                    Assertions.assertThat((File)server.getStorageLocation(jobId, blobKey)).exists();
                }
                throw throwable;
            }
            Assertions.assertThat((Object[])incomingFileDir.list()).isEmpty();
            Assertions.assertThat((Object[])jobStoreDir.list()).isEmpty();
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobCachePutTest.verifyDeletedEventually(server, jobId, blobKey);
            } else {
                Assertions.assertThat((File)server.getStorageLocation(jobId, blobKey)).exists();
            }
        }
        finally {
            if (jobStoreDir != null) {
                jobStoreDir.setWritable(true, false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testGetFailsHaStoreForJobHa() throws IOException {
        JobID jobId = new JobID();
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            server.start();
            byte[] data = new byte[2000000];
            this.rnd.nextBytes(data);
            PermanentBlobKey blobKey = (PermanentBlobKey)BlobServerPutTest.put((BlobService)server, jobId, data, BlobKey.BlobType.PERMANENT_BLOB);
            Assertions.assertThat((boolean)server.getStorageLocation(jobId, (BlobKey)blobKey).delete()).isTrue();
            File tempFileDir = server.createTemporaryFilename().getParentFile();
            try {
                ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> BlobServerGetTest.get((BlobService)cache, jobId, (BlobKey)blobKey)).isInstanceOf(IOException.class)).hasMessageStartingWith("Failed to fetch BLOB");
            }
            finally {
                HashSet<String> expectedDirs = new HashSet<String>();
                expectedDirs.add("incoming");
                expectedDirs.add("job_" + jobId);
                File storageDir = tempFileDir.getParentFile();
                Object[] actualDirs = storageDir.list();
                ((ObjectArrayAssert)((ObjectArrayAssert)Assertions.assertThat((Object[])actualDirs).isNotNull()).isNotEmpty()).containsExactlyInAnyOrderElementsOf(expectedDirs);
                File jobDir = new File(tempFileDir.getParentFile(), "job_" + jobId);
                Assertions.assertThat((Object[])jobDir.list()).isEmpty();
            }
        }
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetTransientRemoteDeleteFailsNoJob() throws IOException {
        this.testGetTransientRemoteDeleteFails(null);
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetTransientRemoteDeleteFailsForJob() throws IOException {
        this.testGetTransientRemoteDeleteFails(new JobID());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testGetTransientRemoteDeleteFails(@Nullable JobID jobId) throws IOException {
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        File blobFile = null;
        File directory = null;
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            server.start();
            try {
                byte[] data = new byte[2000000];
                this.rnd.nextBytes(data);
                TransientBlobKey key = (TransientBlobKey)BlobServerPutTest.put((BlobService)server, jobId, data, BlobKey.BlobType.TRANSIENT_BLOB);
                Assertions.assertThat((Comparable)key).isNotNull();
                blobFile = server.getStorageLocation(jobId, (BlobKey)key);
                directory = blobFile.getParentFile();
                Assertions.assertThat((boolean)blobFile.setWritable(false, false)).isTrue();
                Assertions.assertThat((boolean)directory.setWritable(false, false)).isTrue();
                BlobServerPutTest.verifyContents((BlobService)cache, jobId, (BlobKey)key, data);
                Assertions.assertThat((boolean)BlobServerDeleteTest.delete((BlobService)cache, jobId, key)).isTrue();
                File blobFileAtCache = cache.getTransientBlobService().getStorageLocation(jobId, (BlobKey)key);
                Assertions.assertThat((File)blobFileAtCache).doesNotExist();
                BlobServerPutTest.verifyContents((BlobService)server, jobId, (BlobKey)key, data);
                BlobServerPutTest.verifyContents((BlobService)cache, jobId, (BlobKey)key, data);
            }
            finally {
                if (blobFile != null && directory != null) {
                    blobFile.setWritable(true, false);
                    directory.setWritable(true, false);
                }
            }
        }
    }

    @Test
    void testConcurrentGetOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
        this.testConcurrentGetOperations(null, BlobKey.BlobType.TRANSIENT_BLOB, false);
    }

    @Test
    void testConcurrentGetOperationsForJob() throws IOException, ExecutionException, InterruptedException {
        this.testConcurrentGetOperations(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB, false);
    }

    @Test
    void testConcurrentGetOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
        this.testConcurrentGetOperations(new JobID(), BlobKey.BlobType.PERMANENT_BLOB, false);
    }

    @Test
    void testConcurrentGetOperationsForJobHa2() throws IOException, ExecutionException, InterruptedException {
        this.testConcurrentGetOperations(new JobID(), BlobKey.BlobType.PERMANENT_BLOB, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentGetOperations(JobID jobId, BlobKey.BlobType blobType, boolean cacheAccessesHAStore) throws IOException, InterruptedException, ExecutionException {
        VoidBlobStore blobStoreServer = new VoidBlobStore();
        VoidBlobStore blobStoreCache = new VoidBlobStore();
        int numberConcurrentGetOperations = 3;
        ArrayList<CompletableFuture<File>> getOperations = new ArrayList<CompletableFuture<File>>(3);
        byte[] data = new byte[]{1, 2, 3, 4, 99, 42};
        ExecutorService executor = Executors.newFixedThreadPool(3);
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir, (BlobStore)blobStoreServer, (BlobStore)(cacheAccessesHAStore ? blobStoreServer : blobStoreCache));
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            server.start();
            BlobKey blobKey = BlobServerPutTest.put((BlobService)server, jobId, data, blobType);
            for (int i = 0; i < 3; ++i) {
                CompletableFuture<File> getOperation = CompletableFuture.supplyAsync(() -> {
                    try {
                        File file = BlobServerGetTest.get((BlobService)cache, jobId, blobKey);
                        BlobClientTest.validateGetAndClose(Files.newInputStream(file.toPath(), new OpenOption[0]), data);
                        return file;
                    }
                    catch (IOException e) {
                        throw new CompletionException((Throwable)new FlinkException("Could not read blob for key " + blobKey + '.', (Throwable)e));
                    }
                }, executor);
                getOperations.add(getOperation);
            }
            FutureUtils.ConjunctFuture filesFuture = FutureUtils.combineAll(getOperations);
            if (blobType == BlobKey.BlobType.PERMANENT_BLOB) {
                filesFuture.get();
            } else {
                int completedSuccessfully = 0;
                for (CompletableFuture completableFuture : getOperations) {
                    try {
                        completableFuture.get();
                        ++completedSuccessfully;
                    }
                    catch (Throwable t) {
                        if (org.apache.commons.lang3.exception.ExceptionUtils.getRootCause((Throwable)t) instanceof FileNotFoundException) continue;
                        ExceptionUtils.rethrowIOException((Throwable)t);
                    }
                }
                Assertions.assertThat((int)completedSuccessfully).isGreaterThanOrEqualTo(1);
            }
        }
        finally {
            executor.shutdownNow();
        }
    }
}

