/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.fs.cache;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.ThreadSafeSimpleCounter;
import org.apache.flink.state.forst.ForStOptions;
import org.apache.flink.state.forst.fs.cache.CacheLimitPolicy;
import org.apache.flink.state.forst.fs.cache.CachedDataInputStream;
import org.apache.flink.state.forst.fs.cache.CachedDataOutputStream;
import org.apache.flink.state.forst.fs.cache.DoubleListLru;
import org.apache.flink.state.forst.fs.cache.FileCacheEntry;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class FileBasedCache
extends DoubleListLru<String, FileCacheEntry>
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedCache.class);
    private static final String FORST_CACHE_PREFIX = "forst.fileCache";
    private static final ThreadLocal<Boolean> isFlinkThread = ThreadLocal.withInitial(() -> false);
    private final CacheLimitPolicy cacheLimitPolicy;
    final FileSystem cacheFs;
    private final Path basePath;
    private final int accessBeforePromote;
    private final int promoteLimit;
    private volatile boolean closed = false;
    private final ExecutorService executorService;
    private Counter hitCounter;
    private Counter missCounter;
    private Counter loadBackCounter;
    private Counter evictCounter;
    private long secondAccessEpoch = 0L;

    public FileBasedCache(ReadableConfig configuration, CacheLimitPolicy cacheLimitPolicy, FileSystem cacheFs, Path basePath, MetricGroup metricGroup) {
        this.cacheLimitPolicy = cacheLimitPolicy;
        this.cacheFs = cacheFs;
        this.basePath = basePath;
        this.accessBeforePromote = Math.max(1, (Integer)configuration.get(ForStOptions.CACHE_LRU_ACCESS_BEFORE_PROMOTION));
        this.promoteLimit = (Integer)configuration.get(ForStOptions.CACHE_LRU_PROMOTION_LIMIT);
        this.executorService = Executors.newFixedThreadPool(4, (ThreadFactory)new ExecutorThreadFactory("ForSt-LruLoader"));
        if (metricGroup != null) {
            this.hitCounter = metricGroup.counter("forst.fileCache.hit", (Counter)new ThreadSafeSimpleCounter());
            this.missCounter = metricGroup.counter("forst.fileCache.miss", (Counter)new ThreadSafeSimpleCounter());
            this.loadBackCounter = metricGroup.counter("forst.fileCache.lru.loadback", (Counter)new ThreadSafeSimpleCounter());
            this.evictCounter = metricGroup.counter("forst.fileCache.lru.evict", (Counter)new ThreadSafeSimpleCounter());
            metricGroup.gauge("forst.fileCache.usedBytes", cacheLimitPolicy::usedBytes);
            cacheLimitPolicy.registerCustomizedMetrics(FORST_CACHE_PREFIX, metricGroup);
        }
        LOG.info("FileBasedCache initialized, basePath: {}, cache limit policy: {}", (Object)basePath, (Object)cacheLimitPolicy);
    }

    public static void setFlinkThread() {
        isFlinkThread.set(true);
    }

    @VisibleForTesting
    public static void unsetFlinkThread() {
        isFlinkThread.set(false);
    }

    public static boolean isFlinkThread() {
        return isFlinkThread.get();
    }

    public void incHitCounter() {
        if (this.hitCounter != null && isFlinkThread.get().booleanValue()) {
            this.hitCounter.inc();
        }
    }

    public void incMissCounter() {
        if (this.missCounter != null && isFlinkThread.get().booleanValue()) {
            this.missCounter.inc();
        }
    }

    private Path getCachePath(Path fromOriginal) {
        return new Path(this.basePath, fromOriginal.getName());
    }

    public CachedDataInputStream open(Path path, FSDataInputStream originalStream) throws IOException {
        if (this.closed) {
            return null;
        }
        FileCacheEntry entry = this.get(this.getCachePath(path).toString(), FileBasedCache.isFlinkThread());
        if (entry != null) {
            return entry.open(originalStream);
        }
        return null;
    }

    public CachedDataOutputStream create(FSDataOutputStream originalOutputStream, Path path) throws IOException {
        if (this.closed) {
            return null;
        }
        Path cachePath = this.getCachePath(path);
        return new CachedDataOutputStream(path, cachePath, originalOutputStream, this.cacheLimitPolicy.directWriteInCache() ? this.cacheFs.create(cachePath, FileSystem.WriteMode.OVERWRITE) : null, this);
    }

    public void delete(Path path) {
        if (!this.closed) {
            this.remove(this.getCachePath(path).toString());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public FileCacheEntry get(String key, boolean affectOrder) {
        FileBasedCache fileBasedCache = this;
        synchronized (fileBasedCache) {
            return (FileCacheEntry)((Object)super.get(key, affectOrder));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addFirst(String key, FileCacheEntry value) {
        FileBasedCache fileBasedCache = this;
        synchronized (fileBasedCache) {
            super.addFirst(key, value);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addSecond(String key, FileCacheEntry value) {
        FileBasedCache fileBasedCache = this;
        synchronized (fileBasedCache) {
            super.addSecond(key, value);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public FileCacheEntry remove(String key) {
        FileBasedCache fileBasedCache = this;
        synchronized (fileBasedCache) {
            return (FileCacheEntry)((Object)super.remove(key));
        }
    }

    public void registerInCache(Path originalPath, long size) {
        Path cachePath = this.getCachePath(originalPath);
        FileCacheEntry fileCacheEntry = new FileCacheEntry(this, originalPath, cachePath, size);
        fileCacheEntry.accessCountInColdLink = Math.max(0, this.accessBeforePromote - 2);
        this.addSecond(cachePath.toString(), fileCacheEntry);
    }

    void removeFile(FileCacheEntry entry) {
        if (this.closed) {
            entry.doRemoveFile();
        } else {
            this.executorService.submit(entry::doRemoveFile);
        }
    }

    @Override
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.executorService.shutdown();
        for (Tuple2 entry : this) {
            ((FileCacheEntry)((Object)entry.f1)).close();
        }
    }

    @Override
    boolean isSafeToAddFirst(FileCacheEntry value) {
        return this.cacheLimitPolicy.isSafeToAdd(value.entrySize);
    }

    @Override
    void newNodeCreated(FileCacheEntry value, DoubleListLru.Node n) {
        value.setTouchFunction(() -> {
            FileBasedCache fileBasedCache = this;
            synchronized (fileBasedCache) {
                this.accessNode(n);
            }
        });
    }

    @Override
    void addedToFirst(FileCacheEntry value) {
        LOG.trace("Cache entry {} added to first link.", (Object)value.cachePath);
        while (this.cacheLimitPolicy.isOverflow(value.entrySize, value.checkStatus(FileCacheEntry.EntryStatus.LOADED))) {
            this.moveMiddleFront();
        }
        this.cacheLimitPolicy.acquire(value.entrySize);
    }

    @Override
    void addedToSecond(FileCacheEntry value) {
        LOG.trace("Cache entry {} added to second link.", (Object)value.cachePath);
        value.secondAccessEpoch = ++this.secondAccessEpoch;
        this.tryEvict(value);
    }

    @Override
    void removedFromFirst(FileCacheEntry value) {
        this.cacheLimitPolicy.release(value.entrySize);
        value.close();
    }

    @Override
    void removedFromSecond(FileCacheEntry value) {
        value.close();
    }

    @Override
    void movedToFirst(FileCacheEntry entry) {
        LOG.trace("Cache entry {} moved to first link.", (Object)entry.cachePath);
        if (entry.switchStatus(FileCacheEntry.EntryStatus.INVALID, FileCacheEntry.EntryStatus.LOADED)) {
            entry.loaded();
            if (this.loadBackCounter != null) {
                this.loadBackCounter.inc();
            }
        }
        if (entry.switchStatus(FileCacheEntry.EntryStatus.REMOVED, FileCacheEntry.EntryStatus.LOADING)) {
            this.executorService.submit(() -> {
                if (entry.checkStatus(FileCacheEntry.EntryStatus.LOADING)) {
                    Path path = entry.load();
                    if (path == null) {
                        entry.switchStatus(FileCacheEntry.EntryStatus.LOADING, FileCacheEntry.EntryStatus.REMOVED);
                    } else if (entry.switchStatus(FileCacheEntry.EntryStatus.LOADING, FileCacheEntry.EntryStatus.LOADED)) {
                        entry.loaded();
                        if (this.loadBackCounter != null) {
                            this.loadBackCounter.inc();
                        }
                    } else {
                        try {
                            path.getFileSystem().delete(path, false);
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                    }
                }
            });
        }
    }

    @Override
    void movedToSecond(FileCacheEntry value) {
        LOG.trace("Cache entry {} moved to second link.", (Object)value.cachePath);
        this.cacheLimitPolicy.release(value.entrySize);
        this.tryEvict(value);
    }

    @Override
    boolean nodeAccessedAtSecond(FileCacheEntry value) {
        if (this.secondAccessEpoch - value.secondAccessEpoch >= (long)(this.getSecondSize() / 2)) {
            value.accessCountInColdLink = 0;
            ++this.secondAccessEpoch;
        }
        value.secondAccessEpoch = this.secondAccessEpoch;
        return value.evictCount < this.promoteLimit && ++value.accessCountInColdLink >= this.accessBeforePromote;
    }

    @Override
    void promotedToFirst(FileCacheEntry value) {
        value.accessCountInColdLink = 0;
        while (this.cacheLimitPolicy.isOverflow(value.entrySize, true)) {
            this.moveMiddleFront();
        }
        this.cacheLimitPolicy.acquire(value.entrySize);
    }

    private void tryEvict(FileCacheEntry value) {
        if (value.invalidate() && this.evictCounter != null) {
            this.evictCounter.inc();
            ++value.evictCount;
        }
    }
}

