/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.changelog.fs.ChangelogStreamHandleReader;
import org.apache.flink.changelog.fs.ChangelogStreamWrapper;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.changelog.fs.SchedulerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RefCountedFile;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ChangelogStreamHandleReaderWithCache
implements ChangelogStreamHandleReader {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogStreamHandleReaderWithCache.class);
    private static final String CACHE_FILE_SUB_DIR = "dstl-cache-file";
    private static final String CACHE_FILE_PREFIX = "dstl";
    private static final int NO_USING_REF_COUNT = 1;
    private final File[] cacheDirectories;
    private final AtomicInteger next;
    private final ConcurrentHashMap<Path, RefCountedFile> cache = new ConcurrentHashMap();
    private final ScheduledExecutorService cacheCleanScheduler;
    private final long cacheIdleMillis;

    ChangelogStreamHandleReaderWithCache(Configuration config) {
        this.cacheDirectories = (File[])Arrays.stream(ConfigurationUtils.parseTempDirectories((Configuration)config)).map(path -> new File((String)path, CACHE_FILE_SUB_DIR)).toArray(File[]::new);
        Arrays.stream(this.cacheDirectories).forEach(File::mkdirs);
        this.next = new AtomicInteger(new Random().nextInt(this.cacheDirectories.length));
        this.cacheCleanScheduler = SchedulerFactory.create(1, "ChangelogCacheFileCleanScheduler", LOG);
        this.cacheIdleMillis = ((Duration)config.get(FsStateChangelogOptions.CACHE_IDLE_TIMEOUT)).toMillis();
    }

    @Override
    public DataInputStream openAndSeek(StreamStateHandle handle, Long offset) throws IOException {
        if (!this.canBeCached(handle)) {
            return ChangelogStreamWrapper.wrapAndSeek((InputStream)handle.openInputStream(), offset);
        }
        FileStateHandle fileHandle = (FileStateHandle)handle;
        RefCountedFile refCountedFile = this.getRefCountedFile(fileHandle);
        FileInputStream fin = this.openAndSeek(refCountedFile, (long)offset);
        LOG.debug("return cached file {} (rc={}) for {} (offset={})", new Object[]{refCountedFile.getFile(), refCountedFile.getReferenceCounter(), handle.getStreamStateHandleID(), offset});
        return this.wrapStream(fileHandle.getFilePath(), fin);
    }

    private boolean canBeCached(StreamStateHandle handle) throws IOException {
        if (handle instanceof FileStateHandle) {
            FileStateHandle fileHandle = (FileStateHandle)handle;
            return fileHandle.getFilePath().getFileSystem().isDistributedFS();
        }
        return false;
    }

    private RefCountedFile getRefCountedFile(FileStateHandle fileHandle) {
        return this.cache.compute(fileHandle.getFilePath(), (key, oldValue) -> {
            if (oldValue == null) {
                oldValue = this.downloadToCacheFile(fileHandle);
            }
            oldValue.retain();
            return oldValue;
        });
    }

    /*
     * Exception decompiling
     */
    private RefCountedFile downloadToCacheFile(FileStateHandle fileHandle) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private FileInputStream openAndSeek(RefCountedFile refCountedFile, long offset) throws IOException {
        FileInputStream fin = new FileInputStream(refCountedFile.getFile());
        if (offset != 0L) {
            LOG.trace("seek to {}", (Object)offset);
            fin.getChannel().position(offset);
        }
        return fin;
    }

    private DataInputStream wrapStream(final Path dfsPath, FileInputStream fin) {
        return new DataInputStream(new BufferedInputStream(fin)){
            private boolean closed;
            {
                super(x0);
                this.closed = false;
            }

            @Override
            public void close() throws IOException {
                if (!this.closed) {
                    this.closed = true;
                    try {
                        super.close();
                    }
                    finally {
                        ChangelogStreamHandleReaderWithCache.this.cache.computeIfPresent(dfsPath, (key, value) -> {
                            value.release();
                            if (value.getReferenceCounter() == 1) {
                                ChangelogStreamHandleReaderWithCache.this.cacheCleanScheduler.schedule(() -> ChangelogStreamHandleReaderWithCache.this.cleanCacheFile(dfsPath), ChangelogStreamHandleReaderWithCache.this.cacheIdleMillis, TimeUnit.MILLISECONDS);
                            }
                            return value;
                        });
                    }
                }
            }
        };
    }

    private void cleanCacheFile(Path dfsPath) {
        this.cache.computeIfPresent(dfsPath, (key, value) -> {
            if (value.getReferenceCounter() == 1) {
                LOG.debug("clean cached file : {} after {}ms idle", (Object)value.getFile().getPath(), (Object)this.cacheIdleMillis);
                value.release();
                return null;
            }
            return value;
        });
    }

    @Override
    public void close() throws Exception {
        this.cacheCleanScheduler.shutdownNow();
        if (!this.cacheCleanScheduler.awaitTermination(5L, TimeUnit.SECONDS)) {
            LOG.warn("Unable to cleanly shutdown cache clean scheduler of ChangelogHandleReaderWithCache in 5s");
        }
        Iterator<RefCountedFile> iterator = this.cache.values().iterator();
        while (iterator.hasNext()) {
            RefCountedFile cacheFile = iterator.next();
            iterator.remove();
            LOG.debug("cleanup on close: {}", (Object)cacheFile.getFile().toPath());
            Files.deleteIfExists(cacheFile.getFile().toPath());
        }
    }
}

