/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.Reference;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;

public abstract class AbstractBlobCache
implements Closeable {
    protected final Logger log;
    protected final AtomicLong tempFileCounter = new AtomicLong(0L);
    protected final Reference<File> storageDir;
    protected final BlobView blobView;
    protected final AtomicBoolean shutdownRequested = new AtomicBoolean();
    protected final Thread shutdownHook;
    protected final int numFetchRetries;
    protected final Configuration blobClientConfig;
    protected final ReadWriteLock readWriteLock;
    @Nullable
    protected volatile InetSocketAddress serverAddress;

    public AbstractBlobCache(Configuration blobClientConfig, Reference<File> storageDir, BlobView blobView, Logger logger, @Nullable InetSocketAddress serverAddress) throws IOException {
        this.log = Preconditions.checkNotNull(logger);
        this.blobClientConfig = Preconditions.checkNotNull(blobClientConfig);
        this.blobView = Preconditions.checkNotNull(blobView);
        this.readWriteLock = new ReentrantReadWriteLock();
        this.storageDir = storageDir;
        this.log.info("Created BLOB cache storage directory " + String.valueOf(storageDir));
        int fetchRetries = blobClientConfig.get(BlobServerOptions.FETCH_RETRIES);
        if (fetchRetries >= 0) {
            this.numFetchRetries = fetchRetries;
        } else {
            this.log.warn("Invalid value for {}. System will attempt no retries on failed fetch operations of BLOBs.", (Object)BlobServerOptions.FETCH_RETRIES.key());
            this.numFetchRetries = 0;
        }
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, this.getClass().getSimpleName(), this.log);
        this.serverAddress = serverAddress;
        this.checkStoredBlobsForCorruption();
    }

    private void checkStoredBlobsForCorruption() throws IOException {
        if (this.storageDir.deref().exists()) {
            BlobUtils.checkAndDeleteCorruptedBlobs(this.storageDir.deref().toPath(), this.log);
        }
    }

    public File getStorageDir() {
        return this.storageDir.deref();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
        File incomingFile;
        File localFile;
        block15: {
            File file2;
            Preconditions.checkArgument(blobKey != null, "BLOB key cannot be null.");
            localFile = BlobUtils.getStorageLocation(this.storageDir.deref(), jobId, blobKey);
            this.readWriteLock.readLock().lock();
            try {
                if (localFile.exists()) {
                    File file3 = localFile;
                    return file3;
                }
            }
            finally {
                this.readWriteLock.readLock().unlock();
            }
            incomingFile = this.createTemporaryFilename();
            try {
                if (!this.blobView.get(jobId, blobKey, incomingFile)) break block15;
                this.readWriteLock.writeLock().lock();
                try {
                    BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, localFile, this.log, null);
                }
                finally {
                    this.readWriteLock.writeLock().unlock();
                }
                file2 = localFile;
                if (incomingFile.delete()) return file2;
                if (!incomingFile.exists()) return file2;
            }
            catch (Exception e) {
                this.log.info("Failed to copy from blob store. Downloading from BLOB server instead.", (Throwable)e);
                break block15;
            }
            catch (Throwable throwable) {
                if (incomingFile.delete()) throw throwable;
                if (!incomingFile.exists()) throw throwable;
                this.log.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey, jobId});
                throw throwable;
            }
            this.log.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey, jobId});
            return file2;
        }
        InetSocketAddress currentServerAddress = this.serverAddress;
        if (currentServerAddress == null) throw new IOException("Cannot download from BlobServer, because the server address is unknown.");
        BlobClient.downloadFromBlobServer(jobId, blobKey, incomingFile, currentServerAddress, this.blobClientConfig, this.numFetchRetries);
        this.readWriteLock.writeLock().lock();
        try {
            BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, localFile, this.log, null);
        }
        finally {
            this.readWriteLock.writeLock().unlock();
        }
        File file = localFile;
        if (incomingFile.delete()) return file;
        if (!incomingFile.exists()) return file;
        this.log.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey, jobId});
        return file;
    }

    public int getPort() {
        InetSocketAddress currentServerAddress = this.serverAddress;
        if (currentServerAddress != null) {
            return currentServerAddress.getPort();
        }
        return -1;
    }

    public void setBlobServerAddress(InetSocketAddress blobServerAddress) {
        this.serverAddress = Preconditions.checkNotNull(blobServerAddress);
    }

    File createTemporaryFilename() throws IOException {
        return new File(BlobUtils.getIncomingDirectory(this.storageDir.deref()), String.format("temp-%08d", this.tempFileCounter.getAndIncrement()));
    }

    @Override
    public void close() throws IOException {
        this.cancelCleanupTask();
        if (this.shutdownRequested.compareAndSet(false, true)) {
            this.log.info("Shutting down BLOB cache");
            try {
                this.storageDir.owned().ifPresent(FunctionUtils.uncheckedConsumer(FileUtils::deleteDirectory));
            }
            finally {
                ShutdownHookUtil.removeShutdownHook(this.shutdownHook, this.getClass().getSimpleName(), this.log);
            }
        }
    }

    protected abstract void cancelCleanupTask();
}

