package org.apache.storm.localizer;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/localizer/LocallyCachedBlob.class */
public abstract class LocallyCachedBlob {
    public static final long NOT_DOWNLOADED_VERSION = -1;
    private static final Logger LOG = LoggerFactory.getLogger(LocallyCachedBlob.class);
    private static final BlobChangingCallback NOOP_CB = (localAssignment, i, locallyCachedBlob, goodToGo) -> {
    };
    private final String blobDescription;
    private final String blobKey;
    private final Histogram fetchingRate;
    private final Meter numBlobUpdateVersionChanged;
    private final Timer singleBlobLocalizationDuration;
    private final ConcurrentHashMap<PortAndAssignment, BlobChangingCallback> references = new ConcurrentHashMap<>();
    private AtomicLong lastUsed = new AtomicLong(Time.currentTimeMillis());
    protected long localUpdateTime = -1;

    /* loaded from: input_file:org/apache/storm/localizer/LocallyCachedBlob$DownloadMeta.class */
    static class DownloadMeta {
        private final Path downloadPath;
        private final long version;

        DownloadMeta(Path path, long j) {
            this.downloadPath = path;
            this.version = j;
        }

        public Path getDownloadPath() {
            return this.downloadPath;
        }

        public long getVersion() {
            return this.version;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public LocallyCachedBlob(String str, String str2, StormMetricsRegistry stormMetricsRegistry) {
        this.blobDescription = str;
        this.blobKey = str2;
        this.fetchingRate = stormMetricsRegistry.registerHistogram("supervisor:blob-fetching-rate-MB/s");
        this.numBlobUpdateVersionChanged = stormMetricsRegistry.registerMeter("supervisor:num-blob-update-version-changed");
        this.singleBlobLocalizationDuration = stormMetricsRegistry.registerTimer("supervisor:single-blob-localization-duration");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DownloadMeta fetch(ClientBlobStore clientBlobStore, String str, IOFunction<Long, Path> iOFunction, IOFunction<File, OutputStream> iOFunction2) throws KeyNotFoundException, AuthorizationException, IOException {
        InputStreamWithMeta blob = clientBlobStore.getBlob(str);
        Throwable th = null;
        try {
            long version = blob.getVersion();
            long localVersion = getLocalVersion();
            if (version == localVersion) {
                LOG.warn("The version did not change, but going to download again {} {}", Long.valueOf(localVersion), str);
            }
            Path apply = iOFunction.apply(Long.valueOf(version));
            LOG.debug("Downloading {} to {}", str, apply);
            long j = 0;
            OutputStream apply2 = iOFunction2.apply(apply.toFile());
            Throwable th2 = null;
            try {
                try {
                    long nanoTime = Time.nanoTime();
                    byte[] bArr = new byte[4096];
                    while (true) {
                        int read = blob.read(bArr);
                        if (read < 0) {
                            break;
                        }
                        apply2.write(bArr, 0, read);
                        j += read;
                    }
                    long nanoTime2 = Time.nanoTime() - nanoTime;
                    if (apply2 != null) {
                        if (0 != 0) {
                            try {
                                apply2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            apply2.close();
                        }
                    }
                    long fileLength = blob.getFileLength();
                    if (j != fileLength) {
                        throw new IOException("We expected to download " + fileLength + " bytes but found we got " + j);
                    }
                    this.fetchingRate.update(Math.round((j * 1000.0d) / nanoTime2));
                    DownloadMeta downloadMeta = new DownloadMeta(apply, version);
                    if (blob != null) {
                        if (0 != 0) {
                            try {
                                blob.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            blob.close();
                        }
                    }
                    return downloadMeta;
                } finally {
                }
            } catch (Throwable th5) {
                if (apply2 != null) {
                    if (th2 != null) {
                        try {
                            apply2.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        apply2.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (blob != null) {
                if (0 != 0) {
                    try {
                        blob.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    blob.close();
                }
            }
            throw th7;
        }
    }

    public abstract long getLocalVersion();

    public abstract long getRemoteVersion(ClientBlobStore clientBlobStore) throws KeyNotFoundException, AuthorizationException;

    public abstract long fetchUnzipToTemp(ClientBlobStore clientBlobStore) throws IOException, KeyNotFoundException, AuthorizationException;

    protected abstract void commitNewVersion(long j) throws IOException;

    public abstract void cleanupOrphanedData() throws IOException;

    public abstract void completelyRemove() throws IOException;

    public abstract long getSizeOnDisk();

    /* JADX INFO: Access modifiers changed from: protected */
    public static long getSizeOnDisk(Path path) throws IOException {
        if (Files.exists(path, new LinkOption[0])) {
            return Files.isRegularFile(path, new LinkOption[0]) ? Files.size(path) : Files.walk(path, new FileVisitOption[0]).filter(path2 -> {
                return Files.isRegularFile(path2, LinkOption.NOFOLLOW_LINKS);
            }).mapToLong(path3 -> {
                try {
                    return Files.size(path3);
                } catch (IOException e) {
                    LOG.warn("Could not get the size of {}", path3);
                    return 0L;
                }
            }).sum();
        }
        return 0L;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void touch() {
        this.lastUsed.set(Time.currentTimeMillis());
        LOG.debug("Setting {} ts to {}", this.blobKey, Long.valueOf(this.lastUsed.get()));
    }

    public long getLastUsed() {
        return this.lastUsed.get();
    }

    public boolean isUsed() {
        return !this.references.isEmpty();
    }

    public void addReference(PortAndAssignment portAndAssignment, BlobChangingCallback blobChangingCallback) {
        touch();
        LOG.info("Adding reference {} with timestamp {} to {}", new Object[]{portAndAssignment, Long.valueOf(getLastUsed()), this.blobDescription});
        if (blobChangingCallback == null) {
            blobChangingCallback = NOOP_CB;
        }
        if (this.references.put(portAndAssignment, blobChangingCallback) != null) {
            LOG.warn("{} already has a reservation for {}", portAndAssignment, this.blobDescription);
        }
    }

    public boolean removeReference(PortAndAssignment portAndAssignment) {
        LOG.info("Removing reference {} from {}", portAndAssignment, this.blobDescription);
        PortAndAssignment portAndAssignment2 = null;
        Iterator<Map.Entry<PortAndAssignment, BlobChangingCallback>> it = this.references.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<PortAndAssignment, BlobChangingCallback> next = it.next();
            if (next.getKey().isEquivalentTo(portAndAssignment)) {
                portAndAssignment2 = next.getKey();
                break;
            }
        }
        if (portAndAssignment2 == null) {
            LOG.warn("{} had no reservation for {}, current references are {} with last update at {}", new Object[]{portAndAssignment, this.blobDescription, getDependencies(), Long.valueOf(getLastUsed())});
            return false;
        }
        this.references.remove(portAndAssignment2);
        touch();
        return true;
    }

    public synchronized void informReferencesAndCommitNewVersion(long j) throws IOException {
        CompletableFuture<Void> informAllOfChangeAndWaitForConsensus = informAllOfChangeAndWaitForConsensus();
        commitNewVersion(j);
        informAllOfChangeAndWaitForConsensus.complete(null);
    }

    private CompletableFuture<Void> informAllOfChangeAndWaitForConsensus() {
        HashMap hashMap = new HashMap(this.references);
        CountDownLatch countDownLatch = new CountDownLatch(hashMap.size());
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        for (Map.Entry entry : hashMap.entrySet()) {
            GoodToGo goodToGo = new GoodToGo(countDownLatch, completableFuture);
            try {
                PortAndAssignment portAndAssignment = (PortAndAssignment) entry.getKey();
                ((BlobChangingCallback) entry.getValue()).blobChanging(portAndAssignment.getAssignment(), portAndAssignment.getPort(), this, goodToGo);
                goodToGo.countDownIfLatchWasNotGotten();
            } catch (Throwable th) {
                goodToGo.countDownIfLatchWasNotGotten();
                throw th;
            }
        }
        try {
            countDownLatch.await(3L, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
        }
        return completableFuture;
    }

    public String getKey() {
        return this.blobKey;
    }

    public Collection<PortAndAssignment> getDependencies() {
        return this.references.keySet();
    }

    public abstract boolean isFullyDownloaded();

    boolean requiresUpdate(ClientBlobStore clientBlobStore, long j) throws KeyNotFoundException, AuthorizationException {
        if (!isUsed()) {
            return false;
        }
        if (!isFullyDownloaded()) {
            return true;
        }
        if (j > 0 && this.localUpdateTime == j) {
            LOG.debug("{} is up to date, blob localUpdateTime matches remote timestamp {}", this, Long.valueOf(j));
            return false;
        }
        if (getLocalVersion() != getRemoteVersion(clientBlobStore)) {
            return true;
        }
        this.localUpdateTime = j;
        return false;
    }

    private void download(ClientBlobStore clientBlobStore, long j) throws AuthorizationException, IOException, KeyNotFoundException {
        if (isFullyDownloaded()) {
            this.numBlobUpdateVersionChanged.mark();
        }
        Timer.Context time = this.singleBlobLocalizationDuration.time();
        try {
            informReferencesAndCommitNewVersion(fetchUnzipToTemp(clientBlobStore));
            this.localUpdateTime = j;
            LOG.debug("local blob {} downloaded, in sync with remote blobstore to time {}", this, Long.valueOf(j));
            time.stop();
            cleanupOrphanedData();
        } catch (Throwable th) {
            time.stop();
            cleanupOrphanedData();
            throw th;
        }
    }

    public void update(ClientBlobStore clientBlobStore, long j) throws KeyNotFoundException, AuthorizationException, IOException {
        synchronized (this) {
            if (requiresUpdate(clientBlobStore, j)) {
                download(clientBlobStore, j);
            }
        }
    }
}
