/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobKeyTest;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobServerGetTest;
import org.apache.flink.runtime.blob.BlobServerPutTest;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;

public final class TestingBlobHelpers {
    private TestingBlobHelpers() {
        throw new UnsupportedOperationException();
    }

    public static <T> int checkFilesExist(JobID jobId, Collection<? extends BlobKey> keys, T blobService, boolean doThrow) throws IOException {
        int numFiles = 0;
        for (BlobKey blobKey : keys) {
            PermanentBlobCache cache;
            File storageDir;
            if (blobService instanceof BlobServer) {
                BlobServer server = (BlobServer)blobService;
                storageDir = server.getStorageDir();
            } else if (blobService instanceof PermanentBlobCache) {
                cache = (PermanentBlobCache)blobService;
                storageDir = cache.getStorageDir();
            } else if (blobService instanceof TransientBlobCache) {
                cache = (TransientBlobCache)blobService;
                storageDir = cache.getStorageDir();
            } else {
                throw new UnsupportedOperationException("unsupported BLOB service class: " + blobService.getClass().getCanonicalName());
            }
            File blobFile = new File(BlobUtils.getStorageLocationPath((String)storageDir.getAbsolutePath(), (JobID)jobId, (BlobKey)blobKey));
            if (blobFile.exists()) {
                ++numFiles;
                continue;
            }
            if (!doThrow) continue;
            throw new IOException("File " + String.valueOf(blobFile) + " does not exist.");
        }
        return numFiles;
    }

    public static void checkFileCountForJob(int expectedCount, JobID jobId, PermanentBlobService blobService) throws IOException {
        File jobDir;
        if (blobService instanceof BlobServer) {
            BlobServer server = (BlobServer)blobService;
            jobDir = server.getStorageLocation(jobId, (BlobKey)new PermanentBlobKey()).getParentFile();
        } else {
            PermanentBlobCache cache = (PermanentBlobCache)blobService;
            jobDir = cache.getStorageLocation(jobId, (BlobKey)new PermanentBlobKey()).getParentFile();
        }
        File[] blobsForJob = jobDir.listFiles();
        if (blobsForJob == null) {
            if (expectedCount != 0) {
                throw new IOException("File " + String.valueOf(jobDir) + " does not exist.");
            }
        } else {
            ((AbstractIntegerAssert)Assertions.assertThat((int)blobsForJob.length).as("Too many/few files in job dir: " + String.valueOf(Arrays.asList(blobsForJob)), new Object[0])).isEqualTo(expectedCount);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void testGetFailsFromCorruptFile(Configuration config, BlobStore blobStore, File blobStorage) throws IOException {
        Random rnd = new Random();
        JobID jobId = new JobID();
        try (BlobServer server = new BlobServer(config, blobStorage, blobStore);){
            server.start();
            byte[] data = new byte[2000000];
            rnd.nextBytes(data);
            BlobKey key = BlobServerPutTest.put((BlobService)server, jobId, data, BlobKey.BlobType.PERMANENT_BLOB);
            Assertions.assertThat((Comparable)key).isNotNull();
            File blobFile = server.getStorageLocation(jobId, key);
            Assertions.assertThat((boolean)blobFile.delete()).isTrue();
            byte[] data2 = Arrays.copyOf(data, data.length);
            data2[0] = (byte)(data2[0] ^ 1);
            File tmpFile = Files.createTempFile("blob", ".jar", new FileAttribute[0]).toFile();
            try {
                FileUtils.writeByteArrayToFile((File)tmpFile, (byte[])data2);
                blobStore.put(tmpFile, jobId, key);
            }
            finally {
                tmpFile.delete();
            }
            Assertions.assertThatThrownBy(() -> BlobServerGetTest.get((BlobService)server, jobId, key)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(IOException.class, (String)"data corruption")});
        }
    }

    public static void testGetFailsFromCorruptFile(JobID jobId, Configuration config, BlobStore blobStore, File blobStorage) throws IOException {
        TestingBlobHelpers.testGetFailsFromCorruptFile(jobId, BlobKey.BlobType.PERMANENT_BLOB, true, config, blobStore, blobStorage);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void testGetFailsFromCorruptFile(@Nullable JobID jobId, BlobKey.BlobType blobType, boolean corruptOnHAStore, Configuration config, BlobStore blobStore, File blobStorage) throws IOException {
        ((AbstractBooleanAssert)Assertions.assertThat((!corruptOnHAStore || blobType == BlobKey.BlobType.PERMANENT_BLOB ? 1 : 0) != 0).as("Check HA setup for corrupt HA file", new Object[0])).isTrue();
        Random rnd = new Random();
        try (BlobServer server = new BlobServer(config, new File(blobStorage, "server"), blobStore);
             BlobCacheService cache = new BlobCacheService(config, new File(blobStorage, "cache"), (BlobView)(corruptOnHAStore ? blobStore : new VoidBlobStore()), new InetSocketAddress("localhost", server.getPort()));){
            server.start();
            byte[] data = new byte[2000000];
            rnd.nextBytes(data);
            BlobKey key = BlobServerPutTest.put((BlobService)server, jobId, data, blobType);
            Assertions.assertThat((Comparable)key).isNotNull();
            byte[] data2 = Arrays.copyOf(data, data.length);
            data2[0] = (byte)(data2[0] ^ 1);
            if (corruptOnHAStore) {
                File tmpFile = Files.createTempFile("blob", ".jar", new FileAttribute[0]).toFile();
                try {
                    FileUtils.writeByteArrayToFile((File)tmpFile, (byte[])data2);
                    blobStore.put(tmpFile, jobId, key);
                }
                finally {
                    tmpFile.delete();
                }
                File blobFile = server.getStorageLocation(jobId, key);
                Assertions.assertThat((boolean)blobFile.delete()).isTrue();
            } else {
                File blobFile = server.getStorageLocation(jobId, key);
                Assertions.assertThat((File)blobFile).exists();
                FileUtils.writeByteArrayToFile((File)blobFile, (byte[])data2);
            }
            Assertions.assertThatThrownBy(() -> BlobServerGetTest.get((BlobService)cache, jobId, key)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(IOException.class, (String)"data corruption")});
        }
    }

    public static void testBlobServerRecovery(Configuration config, BlobStore blobStore, File blobStorage) throws Exception {
        String clusterId = (String)config.get(HighAvailabilityOptions.HA_CLUSTER_ID);
        String storagePath = (String)config.get(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId;
        Random rand = new Random();
        try (BlobServer server0 = new BlobServer(config, new File(blobStorage, "server0"), blobStore);
             BlobServer server1 = new BlobServer(config, new File(blobStorage, "server1"), blobStore);
             BlobCacheService cache1 = new BlobCacheService(config, new File(blobStorage, "cache1"), (BlobView)new VoidBlobStore(), new InetSocketAddress("localhost", server1.getPort()));){
            server0.start();
            server1.start();
            byte[] expected = new byte[1024];
            rand.nextBytes(expected);
            byte[] expected2 = Arrays.copyOfRange(expected, 32, 288);
            BlobKey[] keys = new BlobKey[2];
            JobID[] jobId = new JobID[]{new JobID(), new JobID()};
            keys[0] = BlobServerPutTest.put((BlobService)server0, jobId[0], expected, BlobKey.BlobType.PERMANENT_BLOB);
            keys[1] = BlobServerPutTest.put((BlobService)server0, jobId[1], expected2, BlobKey.BlobType.PERMANENT_BLOB);
            BlobKey nonHAKey = BlobServerPutTest.put((BlobService)server0, jobId[0], expected2, BlobKey.BlobType.TRANSIENT_BLOB);
            BlobKeyTest.verifyKeyDifferentHashEquals(keys[1], nonHAKey);
            Path blobServerPath = new Path(storagePath, "blob");
            FileSystem fs = blobServerPath.getFileSystem();
            Assertions.assertThat((boolean)fs.exists(blobServerPath)).isTrue();
            BlobServerPutTest.verifyContents((BlobService)cache1, jobId[0], keys[0], expected);
            BlobServerPutTest.verifyContents((BlobService)cache1, jobId[1], keys[1], expected2);
            BlobServerGetTest.verifyDeleted((BlobService)cache1, jobId[0], nonHAKey);
            server1.globalCleanupAsync(jobId[0], Executors.directExecutor()).join();
            server1.globalCleanupAsync(jobId[1], Executors.directExecutor()).join();
            Assertions.assertThat((boolean)fs.exists(new Path(storagePath))).isTrue();
            if (fs.exists(blobServerPath)) {
                FileStatus[] recoveryFiles = fs.listStatus(blobServerPath);
                ArrayList<String> filenames = new ArrayList<String>(recoveryFiles.length);
                for (FileStatus file : recoveryFiles) {
                    filenames.add(file.toString());
                }
                Assertions.fail((String)"Unclean state backend: %s", (Object[])new Object[]{filenames});
            }
        }
    }

    public static void testBlobCacheRecovery(Configuration config, BlobStore blobStore, File blobStorage) throws IOException {
        String clusterId = (String)config.get(HighAvailabilityOptions.HA_CLUSTER_ID);
        String storagePath = (String)config.get(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId;
        Random rand = new Random();
        try (BlobServer server0 = new BlobServer(config, new File(blobStorage, "server0"), blobStore);
             BlobServer server1 = new BlobServer(config, new File(blobStorage, "server1"), blobStore);
             BlobCacheService cache0 = new BlobCacheService(config, new File(blobStorage, "cache0"), (BlobView)new VoidBlobStore(), new InetSocketAddress("localhost", server0.getPort()));
             BlobCacheService cache1 = new BlobCacheService(config, new File(blobStorage, "cache1"), (BlobView)new VoidBlobStore(), new InetSocketAddress("localhost", server1.getPort()));){
            server0.start();
            server1.start();
            byte[] expected = new byte[1024];
            rand.nextBytes(expected);
            byte[] expected2 = Arrays.copyOfRange(expected, 32, 288);
            BlobKey[] keys = new BlobKey[2];
            JobID[] jobId = new JobID[]{new JobID(), new JobID()};
            keys[0] = BlobServerPutTest.put((BlobService)cache0, jobId[0], expected, BlobKey.BlobType.PERMANENT_BLOB);
            keys[1] = BlobServerPutTest.put((BlobService)cache0, jobId[1], expected2, BlobKey.BlobType.PERMANENT_BLOB);
            BlobKey nonHAKey = BlobServerPutTest.put((BlobService)cache0, jobId[0], expected2, BlobKey.BlobType.TRANSIENT_BLOB);
            BlobKeyTest.verifyKeyDifferentHashDifferent(keys[0], nonHAKey);
            BlobKeyTest.verifyKeyDifferentHashEquals(keys[1], nonHAKey);
            Path blobServerPath = new Path(storagePath, "blob");
            FileSystem fs = blobServerPath.getFileSystem();
            Assertions.assertThat((boolean)fs.exists(blobServerPath)).isTrue();
            BlobServerPutTest.verifyContents((BlobService)cache1, jobId[0], keys[0], expected);
            BlobServerPutTest.verifyContents((BlobService)cache1, jobId[1], keys[1], expected2);
            BlobServerGetTest.verifyDeleted((BlobService)cache1, jobId[0], nonHAKey);
        }
    }
}

