/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class BlobCache
implements BlobService {
    private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class);
    private final InetSocketAddress serverAddress;
    private final File storageDir;
    private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    private final Thread shutdownHook;
    private final int numFetchRetries;

    public BlobCache(InetSocketAddress serverAddress, Configuration configuration) {
        if (serverAddress == null || configuration == null) {
            throw new NullPointerException();
        }
        this.serverAddress = serverAddress;
        String storageDirectory = configuration.getString("blob.storage.directory", null);
        this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
        LOG.info("Created BLOB cache storage directory " + this.storageDir);
        int fetchRetries = configuration.getInteger("blob.fetch.retries", 5);
        if (fetchRetries >= 0) {
            this.numFetchRetries = fetchRetries;
        } else {
            LOG.warn("Invalid value for {}. System will attempt no retires on failed fetches of BLOBs.", (Object)"blob.fetch.retries");
            this.numFetchRetries = 0;
        }
        this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    }

    @Override
    public URL getURL(BlobKey requiredBlob) throws IOException {
        if (requiredBlob == null) {
            throw new IllegalArgumentException("BLOB key cannot be null.");
        }
        File localJarFile = BlobUtils.getStorageLocation(this.storageDir, requiredBlob);
        if (!localJarFile.exists()) {
            byte[] buf = new byte[65536];
            int attempt = 0;
            while (true) {
                if (attempt == 0) {
                    LOG.info("Downloading {} from {}", (Object)requiredBlob, (Object)this.serverAddress);
                } else {
                    LOG.info("Downloading {} from {} (retry {})", new Object[]{requiredBlob, this.serverAddress, attempt});
                }
                try {
                    BlobClient bc = null;
                    InputStream is = null;
                    FileOutputStream os = null;
                    try {
                        int read;
                        bc = new BlobClient(this.serverAddress);
                        is = bc.get(requiredBlob);
                        os = new FileOutputStream(localJarFile);
                        while ((read = is.read(buf)) >= 0) {
                            ((OutputStream)os).write(buf, 0, read);
                        }
                        ((OutputStream)os).close();
                        os = null;
                        is.close();
                        is = null;
                        bc.close();
                        bc = null;
                    }
                    catch (Throwable t) {
                        this.closeSilently(os);
                        this.closeSilently(is);
                        this.closeSilently(bc);
                        if (t instanceof IOException) {
                            throw (IOException)t;
                        }
                        throw new IOException(t.getMessage(), t);
                    }
                }
                catch (IOException e) {
                    String message = "Failed to fetch BLOB " + requiredBlob + " from " + this.serverAddress + " and store it under " + localJarFile.getAbsolutePath();
                    if (attempt < this.numFetchRetries) {
                        ++attempt;
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(message + " Retrying...", (Throwable)e);
                            continue;
                        }
                        LOG.error(message + " Retrying...");
                        continue;
                    }
                    LOG.error(message + " No retries left.", (Throwable)e);
                    throw new IOException(message, e);
                }
                break;
            }
        }
        return localJarFile.toURI().toURL();
    }

    @Override
    public void delete(BlobKey key) throws IOException {
        File localFile = BlobUtils.getStorageLocation(this.storageDir, key);
        if (localFile.exists() && !localFile.delete()) {
            LOG.warn("Failed to delete locally cached BLOB " + key + " at " + localFile.getAbsolutePath());
        }
    }

    @Override
    public int getPort() {
        return this.serverAddress.getPort();
    }

    @Override
    public void shutdown() {
        if (this.shutdownRequested.compareAndSet(false, true)) {
            LOG.info("Shutting down BlobCache");
            try {
                FileUtils.deleteDirectory((File)this.storageDir);
            }
            catch (IOException e) {
                LOG.error("BLOB cache failed to properly clean up its storage directory.");
            }
            if (this.shutdownHook != null && this.shutdownHook != Thread.currentThread()) {
                try {
                    Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
                }
                catch (IllegalStateException e) {
                }
                catch (Throwable t) {
                    LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
                }
            }
        }
    }

    public File getStorageDir() {
        return this.storageDir;
    }

    private void closeSilently(Closeable closeable) {
        block3: {
            if (closeable != null) {
                try {
                    closeable.close();
                }
                catch (Throwable t) {
                    if (!LOG.isDebugEnabled()) break block3;
                    LOG.debug("Error while closing resource after BLOB transfer.", t);
                }
            }
        }
    }
}

