/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.file.CopyOption;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.blob.FileSystemBlobStore;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Reference;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;

public class BlobUtils {
    private static final String HASHING_ALGORITHM = "SHA-1";
    private static final String BLOB_FILE_PREFIX = "blob_";
    static final String JOB_DIR_PREFIX = "job_";
    static final String NO_JOB_DIR_PREFIX = "no_job";
    private static final Random RANDOM = new Random();

    public static BlobStoreService createBlobStoreFromConfig(Configuration config) throws IOException {
        if (HighAvailabilityMode.isHighAvailabilityModeActivated(config)) {
            return BlobUtils.createFileSystemBlobStore(config);
        }
        return new VoidBlobStore();
    }

    private static BlobStoreService createFileSystemBlobStore(Configuration configuration) throws IOException {
        FileSystem fileSystem;
        Path clusterStoragePath = HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath(configuration);
        try {
            fileSystem = clusterStoragePath.getFileSystem();
        }
        catch (Exception e) {
            throw new IOException(String.format("Could not create FileSystem for highly available storage path (%s)", clusterStoragePath), e);
        }
        return new FileSystemBlobStore(fileSystem, clusterStoragePath.toUri().toString());
    }

    public static BlobServer createBlobServer(Configuration configuration, Reference<File> fallbackStorageDirectory, BlobStore blobStore) throws IOException {
        Reference<File> storageDirectory = BlobUtils.createBlobStorageDirectory(configuration, fallbackStorageDirectory);
        return new BlobServer(configuration, storageDirectory, blobStore);
    }

    public static BlobCacheService createBlobCacheService(Configuration configuration, Reference<File> fallbackStorageDirectory, BlobView blobView, @Nullable InetSocketAddress serverAddress) throws IOException {
        Reference<File> storageDirectory = BlobUtils.createBlobStorageDirectory(configuration, fallbackStorageDirectory);
        return new BlobCacheService(configuration, storageDirectory, blobView, serverAddress);
    }

    static Reference<File> createBlobStorageDirectory(Configuration configuration, @Nullable Reference<File> fallbackStorageDirectory) throws IOException {
        String basePath = (String)configuration.get(BlobServerOptions.STORAGE_DIRECTORY);
        File baseDir = null;
        if (StringUtils.isNullOrWhitespaceOnly((String)basePath)) {
            if (fallbackStorageDirectory != null && ((baseDir = (File)fallbackStorageDirectory.deref()).mkdirs() || baseDir.exists())) {
                return fallbackStorageDirectory;
            }
        } else {
            baseDir = new File(basePath);
            int maxAttempts = 10;
            for (int attempt = 0; attempt < maxAttempts; ++attempt) {
                File storageDir = new File(baseDir, String.format("blobStore-%s", UUID.randomUUID()));
                if (!storageDir.mkdirs()) continue;
                return Reference.owned((Object)storageDir);
            }
        }
        if (baseDir != null) {
            throw new IOException("Could not create storage directory for BLOB store in '" + baseDir + "'.");
        }
        throw new IOException(String.format("Could not create storage directory for BLOB store because no storage directory has been specified under %s and no fallback storage directory provided.", BlobServerOptions.STORAGE_DIRECTORY.key()));
    }

    static File getIncomingDirectory(File storageDir) throws IOException {
        File incomingDir = new File(storageDir, "incoming");
        Files.createDirectories(incomingDir.toPath(), new FileAttribute[0]);
        return incomingDir;
    }

    static File getStorageLocation(File storageDir, @Nullable JobID jobId, BlobKey key) throws IOException {
        File file = new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
        Files.createDirectories(file.getParentFile().toPath(), new FileAttribute[0]);
        return file;
    }

    static String getStorageLocationPath(String storageDir, @Nullable JobID jobId) {
        if (jobId == null) {
            return String.format("%s/%s", storageDir, NO_JOB_DIR_PREFIX);
        }
        return String.format("%s/%s%s", storageDir, JOB_DIR_PREFIX, jobId.toString());
    }

    static String getStorageLocationPath(String storageDir, @Nullable JobID jobId, BlobKey key) {
        if (jobId == null) {
            return String.format("%s/%s/%s%s", storageDir, NO_JOB_DIR_PREFIX, BLOB_FILE_PREFIX, key.toString());
        }
        return String.format("%s/%s%s/%s%s", storageDir, JOB_DIR_PREFIX, jobId.toString(), BLOB_FILE_PREFIX, key.toString());
    }

    static MessageDigest createMessageDigest() {
        try {
            return MessageDigest.getInstance(HASHING_ALGORITHM);
        }
        catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("Cannot instantiate the message digest algorithm SHA-1", e);
        }
    }

    static void writeLength(int length, OutputStream outputStream) throws IOException {
        byte[] buf = new byte[]{(byte)(length & 0xFF), (byte)(length >> 8 & 0xFF), (byte)(length >> 16 & 0xFF), (byte)(length >> 24 & 0xFF)};
        outputStream.write(buf, 0, 4);
    }

    static int readLength(InputStream inputStream) throws IOException {
        int bytesRead;
        int read;
        byte[] buf = new byte[4];
        for (bytesRead = 0; bytesRead < 4; bytesRead += read) {
            read = inputStream.read(buf, bytesRead, 4 - bytesRead);
            if (read >= 0) continue;
            throw new EOFException("Read an incomplete length");
        }
        bytesRead = buf[0] & 0xFF;
        bytesRead |= (buf[1] & 0xFF) << 8;
        bytesRead |= (buf[2] & 0xFF) << 16;
        return bytesRead |= (buf[3] & 0xFF) << 24;
    }

    static Throwable readExceptionFromStream(InputStream in) throws IOException {
        int len = BlobUtils.readLength(in);
        byte[] bytes = new byte[len];
        BlobUtils.readFully(in, bytes, 0, len, "Error message");
        try {
            return (Throwable)InstantiationUtil.deserializeObject((byte[])bytes, (ClassLoader)ClassLoader.getSystemClassLoader());
        }
        catch (ClassNotFoundException e) {
            throw new IOException("Could not transfer error message", e);
        }
    }

    static void readFully(InputStream inputStream, byte[] buf, int off, int len, String type) throws IOException {
        int read;
        for (int bytesRead = 0; bytesRead < len; bytesRead += read) {
            read = inputStream.read(buf, off + bytesRead, len - bytesRead);
            if (read >= 0) continue;
            throw new EOFException("Received an incomplete " + type);
        }
    }

    static void closeSilently(Socket socket, Logger log) {
        if (socket != null) {
            try {
                socket.close();
            }
            catch (Throwable t) {
                log.debug("Exception while closing BLOB server connection socket.", t);
            }
        }
    }

    private BlobUtils() {
        throw new RuntimeException();
    }

    static void moveTempFileToStore(File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile, Logger log, @Nullable BlobStore blobStore) throws IOException {
        BlobUtils.internalMoveTempFileToStore(incomingFile, jobId, blobKey, storageFile, log, blobStore, (source, target) -> Files.move(source.toPath(), target.toPath(), new CopyOption[0]));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    static void internalMoveTempFileToStore(File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile, Logger log, @Nullable BlobStore blobStore, MoveFileOperation moveFileOperation) throws IOException {
        block13: {
            block14: {
                boolean success = false;
                try {
                    if (!storageFile.exists()) {
                        if (blobStore != null) {
                            blobStore.put(incomingFile, jobId, blobKey);
                        }
                        try {
                            moveFileOperation.moveFile(incomingFile, storageFile);
                            incomingFile = null;
                        }
                        catch (FileAlreadyExistsException ignored) {
                            log.warn("Detected concurrent file modifications. This should only happen if multipleBlobServer use the same storage directory.");
                        }
                    } else {
                        log.warn("File upload for an existing file with key {} for job {}. This may indicate a duplicate upload or a hash collision. Ignoring newest upload.", (Object)blobKey, (Object)jobId);
                    }
                    success = true;
                    if (success) break block13;
                    if (blobStore == null) break block14;
                    blobStore.delete(jobId, blobKey);
                }
                catch (Throwable throwable) {
                    if (!success) {
                        if (blobStore != null) {
                            blobStore.delete(jobId, blobKey);
                        }
                        if (!storageFile.delete() && storageFile.exists()) {
                            log.warn("Could not delete the storage file {}.", (Object)storageFile);
                        }
                    }
                    if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) {
                        log.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey, jobId});
                    }
                    throw throwable;
                }
            }
            if (!storageFile.delete() && storageFile.exists()) {
                log.warn("Could not delete the storage file {}.", (Object)storageFile);
            }
        }
        if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) {
            log.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey, jobId});
        }
    }

    public static byte[] calculateMessageDigest(File file) throws IOException {
        MessageDigest messageDigest = BlobUtils.createMessageDigest();
        byte[] buffer = new byte[4096];
        try (FileInputStream fis = new FileInputStream(file);){
            int bytesRead;
            while ((bytesRead = fis.read(buffer, 0, buffer.length)) != -1) {
                messageDigest.update(buffer, 0, bytesRead);
            }
        }
        return messageDigest.digest();
    }

    static void checkAndDeleteCorruptedBlobs(java.nio.file.Path storageDir, Logger log) throws IOException {
        for (Blob<?> blob : BlobUtils.listBlobsInDirectory(storageDir)) {
            Object blobKey = blob.getBlobKey();
            java.nio.file.Path blobPath = blob.getPath();
            byte[] messageDigest = BlobUtils.calculateMessageDigest(blobPath.toFile());
            if (Arrays.equals(((BlobKey)blobKey).getHash(), messageDigest)) continue;
            log.info("Found corrupted blob {} under {}. Deleting this blob.", blobKey, (Object)blobPath);
            try {
                FileUtils.deleteFileOrDirectory((File)blobPath.toFile());
            }
            catch (IOException ioe) {
                log.debug("Could not delete the blob {}.", (Object)blobPath, (Object)ioe);
            }
        }
    }

    @Nonnull
    static Collection<java.nio.file.Path> listBlobFilesInDirectory(java.nio.file.Path directory) throws IOException {
        return FileUtils.listFilesInDirectory((java.nio.file.Path)directory, file -> file.getFileName().toString().startsWith(BLOB_FILE_PREFIX));
    }

    @Nonnull
    static Collection<Blob<?>> listBlobsInDirectory(java.nio.file.Path directory) throws IOException {
        return BlobUtils.listBlobFilesInDirectory(directory).stream().map(blobPath -> {
            JobID jobId;
            BlobKey blobKey = BlobKey.fromString(blobPath.getFileName().toString().substring(BLOB_FILE_PREFIX.length()));
            String jobDirectory = blobPath.getParent().getFileName().toString();
            if (jobDirectory.equals(NO_JOB_DIR_PREFIX)) {
                jobId = null;
            } else if (jobDirectory.startsWith(JOB_DIR_PREFIX)) {
                jobId = new JobID(StringUtils.hexStringToByte((String)jobDirectory.substring(JOB_DIR_PREFIX.length())));
            } else {
                throw new IllegalStateException(String.format("Unknown job path %s.", jobDirectory));
            }
            if (blobKey instanceof TransientBlobKey) {
                return new TransientBlob((TransientBlobKey)blobKey, (java.nio.file.Path)blobPath, jobId);
            }
            if (blobKey instanceof PermanentBlobKey) {
                return new PermanentBlob((PermanentBlobKey)blobKey, (java.nio.file.Path)blobPath, jobId);
            }
            throw new IllegalStateException(String.format("Unknown blob key format %s.", blobKey.getClass()));
        }).collect(Collectors.toList());
    }

    @Nonnull
    static Collection<TransientBlob> listTransientBlobsInDirectory(java.nio.file.Path directory) throws IOException {
        return BlobUtils.listBlobsInDirectory(directory).stream().filter(blob -> blob.getBlobKey() instanceof TransientBlobKey).map(blob -> (TransientBlob)blob).collect(Collectors.toList());
    }

    @Nonnull
    static Collection<PermanentBlob> listPermanentBlobsInDirectory(java.nio.file.Path directory) throws IOException {
        return BlobUtils.listBlobsInDirectory(directory).stream().filter(blob -> blob.getBlobKey() instanceof PermanentBlobKey).map(blob -> (PermanentBlob)blob).collect(Collectors.toList());
    }

    static Set<JobID> listExistingJobs(java.nio.file.Path directory) throws IOException {
        return BlobUtils.listBlobsInDirectory(directory).stream().map(Blob::getJobId).filter(Objects::nonNull).collect(Collectors.toSet());
    }

    static final class PermanentBlob
    extends Blob<PermanentBlobKey> {
        PermanentBlob(PermanentBlobKey blobKey, java.nio.file.Path path, @Nullable JobID jobId) {
            super(blobKey, path, jobId);
        }
    }

    static final class TransientBlob
    extends Blob<TransientBlobKey> {
        TransientBlob(TransientBlobKey blobKey, java.nio.file.Path path, @Nullable JobID jobId) {
            super(blobKey, path, jobId);
        }
    }

    static abstract class Blob<T extends BlobKey> {
        private final T blobKey;
        private final java.nio.file.Path path;
        @Nullable
        private final JobID jobId;

        Blob(T blobKey, java.nio.file.Path path, @Nullable JobID jobId) {
            this.blobKey = blobKey;
            this.path = path;
            this.jobId = jobId;
        }

        public T getBlobKey() {
            return this.blobKey;
        }

        public java.nio.file.Path getPath() {
            return this.path;
        }

        @Nullable
        public JobID getJobId() {
            return this.jobId;
        }
    }

    static interface MoveFileOperation {
        public void moveFile(File var1, File var2) throws IOException;
    }
}

