/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.rocksdb;

import java.io.Closeable;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.shaded.guava33.com.google.common.primitives.UnsignedBytes;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.function.RunnableWithException;
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.ExportImportFilesMetaData;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBIncrementalCheckpointUtils {
    private static final Logger logger = LoggerFactory.getLogger(RocksDBIncrementalCheckpointUtils.class);

    private static Score stateHandleEvaluator(KeyedStateHandle stateHandle, KeyGroupRange targetKeyGroupRange, double overlapFractionThreshold) {
        KeyGroupRange handleKeyGroupRange = stateHandle.getKeyGroupRange();
        KeyGroupRange intersectGroup = handleKeyGroupRange.getIntersection(targetKeyGroupRange);
        double overlapFraction = (double)intersectGroup.getNumberOfKeyGroups() / (double)handleKeyGroupRange.getNumberOfKeyGroups();
        if (overlapFraction < overlapFractionThreshold) {
            return Score.MIN;
        }
        return new Score(intersectGroup.getNumberOfKeyGroups(), overlapFraction);
    }

    public static void clipDBWithKeyGroupRange(@Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @Nonnull KeyGroupRange targetKeyGroupRange, @Nonnull KeyGroupRange currentKeyGroupRange, @Nonnegative int keyGroupPrefixBytes, boolean useDeleteFilesInRange) throws RocksDBException {
        ArrayList<byte[]> deleteFilesRanges = new ArrayList<byte[]>(4);
        if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) {
            RocksDBIncrementalCheckpointUtils.prepareRangeDeletes(keyGroupPrefixBytes, currentKeyGroupRange.getStartKeyGroup(), targetKeyGroupRange.getStartKeyGroup(), deleteFilesRanges);
        }
        if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
            RocksDBIncrementalCheckpointUtils.prepareRangeDeletes(keyGroupPrefixBytes, targetKeyGroupRange.getEndKeyGroup() + 1, currentKeyGroupRange.getEndKeyGroup() + 1, deleteFilesRanges);
        }
        logger.info("Performing range delete for backend with target key-groups range {} with boundaries set {} - deleteFilesInRanges = {}.", new Object[]{targetKeyGroupRange.prettyPrintInterval(), deleteFilesRanges.stream().map(Arrays::toString).collect(Collectors.toList()), useDeleteFilesInRange});
        RocksDBIncrementalCheckpointUtils.deleteRangeData(db, columnFamilyHandles, deleteFilesRanges, useDeleteFilesInRange);
    }

    private static void prepareRangeDeletes(int keyGroupPrefixBytes, int beginKeyGroup, int endKeyGroup, List<byte[]> deleteFilesRangesOut) {
        byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
        byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
        CompositeKeySerializationUtils.serializeKeyGroup((int)beginKeyGroup, (byte[])beginKeyGroupBytes);
        CompositeKeySerializationUtils.serializeKeyGroup((int)endKeyGroup, (byte[])endKeyGroupBytes);
        deleteFilesRangesOut.add(beginKeyGroupBytes);
        deleteFilesRangesOut.add(endKeyGroupBytes);
    }

    private static void deleteRangeData(RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles, List<byte[]> deleteRanges, boolean useDeleteFilesInRange) throws RocksDBException {
        if (deleteRanges.isEmpty()) {
            return;
        }
        Preconditions.checkArgument((deleteRanges.size() % 2 == 0 ? 1 : 0) != 0);
        for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
            if (useDeleteFilesInRange) {
                db.deleteFilesInRanges(columnFamilyHandle, deleteRanges, false);
            }
            for (int i = 0; i < deleteRanges.size() / 2; ++i) {
                db.deleteRange(columnFamilyHandle, deleteRanges.get(i * 2), deleteRanges.get(i * 2 + 1));
            }
        }
    }

    public static RunnableWithException createAsyncRangeCompactionTask(RocksDB db, Collection<ColumnFamilyHandle> columnFamilyHandles, int keyGroupPrefixBytes, KeyGroupRange dbExpectedKeyGroupRange, ResourceGuard rocksDBResourceGuard, CloseableRegistry closeableRegistry) {
        return () -> {
            block31: {
                RangeCheckResult rangeCheckResult;
                logger.debug("Starting range check for async compaction targeting key-groups range {}.", (Object)dbExpectedKeyGroupRange.prettyPrintInterval());
                try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();){
                    rangeCheckResult = RocksDBIncrementalCheckpointUtils.checkSstDataAgainstKeyGroupRange(db, keyGroupPrefixBytes, dbExpectedKeyGroupRange);
                }
                if (rangeCheckResult.allInRange()) {
                    logger.debug("Nothing to compact in async compaction targeting key-groups range {}.", (Object)dbExpectedKeyGroupRange.prettyPrintInterval());
                    return;
                }
                try (CompactRangeOptions compactionOptions = new CompactRangeOptions().setBottommostLevelCompaction(CompactRangeOptions.BottommostLevelCompaction.kForceOptimized);){
                    Closeable cancelCompactionCloseable = () -> {
                        logger.info("Cancel request for async compaction targeting key-groups range {}.", (Object)dbExpectedKeyGroupRange.prettyPrintInterval(), (Object)new Exception("StackTrace"));
                        compactionOptions.setCanceled(true);
                    };
                    try {
                        ResourceGuard.Lease ignored;
                        closeableRegistry.registerCloseable((AutoCloseable)cancelCompactionCloseable);
                        if (!rangeCheckResult.leftInRange) {
                            logger.debug("Compacting left interval in async compaction targeting key-groups range {}.", (Object)dbExpectedKeyGroupRange.prettyPrintInterval());
                            for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
                                ignored = rocksDBResourceGuard.acquireResource();
                                try {
                                    db.compactRange(columnFamilyHandle, new byte[0], rangeCheckResult.getProclaimedMinKey(), compactionOptions);
                                }
                                finally {
                                    if (ignored != null) {
                                        ignored.close();
                                    }
                                }
                            }
                        }
                        if (rangeCheckResult.rightInRange) break block31;
                        logger.debug("Compacting right interval in async compaction targeting key-groups range {}.", (Object)dbExpectedKeyGroupRange.prettyPrintInterval());
                        for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
                            ignored = rocksDBResourceGuard.acquireResource();
                            try {
                                db.compactRange(columnFamilyHandle, rangeCheckResult.getProclaimedMaxKey(), new byte[]{-1, -1, -1, -1}, compactionOptions);
                            }
                            finally {
                                if (ignored == null) continue;
                                ignored.close();
                            }
                        }
                    }
                    finally {
                        closeableRegistry.unregisterCloseable((AutoCloseable)cancelCompactionCloseable);
                    }
                }
            }
        };
    }

    public static RangeCheckResult checkSstDataAgainstKeyGroupRange(RocksDB db, int keyGroupPrefixBytes, KeyGroupRange dbExpectedKeyGroupRange) {
        byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
        byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
        CompositeKeySerializationUtils.serializeKeyGroup((int)dbExpectedKeyGroupRange.getStartKeyGroup(), (byte[])beginKeyGroupBytes);
        CompositeKeySerializationUtils.serializeKeyGroup((int)(dbExpectedKeyGroupRange.getEndKeyGroup() + 1), (byte[])endKeyGroupBytes);
        KeyRange dbKeyRange = RocksDBIncrementalCheckpointUtils.getDBKeyRange(db);
        return RangeCheckResult.of(beginKeyGroupBytes, endKeyGroupBytes, dbKeyRange.minKey, dbKeyRange.maxKey, keyGroupPrefixBytes);
    }

    private static KeyRange getDBKeyRange(RocksDB db) {
        Comparator comparator = UnsignedBytes.lexicographicalComparator();
        List liveFilesMetaData = db.getLiveFilesMetaData();
        if (liveFilesMetaData.isEmpty()) {
            return KeyRange.EMPTY;
        }
        Iterator liveFileMetaDataIterator = liveFilesMetaData.iterator();
        LiveFileMetaData fileMetaData = (LiveFileMetaData)liveFileMetaDataIterator.next();
        byte[] smallestKey = fileMetaData.smallestKey();
        byte[] largestKey = fileMetaData.largestKey();
        while (liveFileMetaDataIterator.hasNext()) {
            fileMetaData = (LiveFileMetaData)liveFileMetaDataIterator.next();
            byte[] sstSmallestKey = fileMetaData.smallestKey();
            byte[] sstLargestKey = fileMetaData.largestKey();
            if (comparator.compare(sstSmallestKey, smallestKey) < 0) {
                smallestKey = sstSmallestKey;
            }
            if (comparator.compare(sstLargestKey, largestKey) <= 0) continue;
            largestKey = sstLargestKey;
        }
        return KeyRange.of(smallestKey, largestKey);
    }

    public static void exportColumnFamilies(RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles, List<RegisteredStateMetaInfoBase> registeredStateMetaInfoBases, Path exportBasePath, Map<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> resultOutput) throws RocksDBException {
        Preconditions.checkArgument((columnFamilyHandles.size() == registeredStateMetaInfoBases.size() ? 1 : 0) != 0, (Object)"Lists are aligned by index and must be of the same size!");
        try (Checkpoint checkpoint = Checkpoint.create((RocksDB)db);){
            for (int i = 0; i < columnFamilyHandles.size(); ++i) {
                RegisteredStateMetaInfoBase.Key stateMetaInfoAsKey = registeredStateMetaInfoBases.get(i).asMapKey();
                Path subPath = exportBasePath.resolve(UUID.randomUUID().toString());
                ExportImportFilesMetaData exportedColumnFamilyMetaData = checkpoint.exportColumnFamily(columnFamilyHandles.get(i), subPath.toString());
                File[] exportedSstFiles = subPath.toFile().listFiles((file, name) -> name.toLowerCase().endsWith(".sst"));
                if (exportedSstFiles != null && exportedSstFiles.length > 0) {
                    resultOutput.computeIfAbsent(stateMetaInfoAsKey, key -> new ArrayList()).add(exportedColumnFamilyMetaData);
                    continue;
                }
                IOUtils.closeQuietly((AutoCloseable)exportedColumnFamilyMetaData);
            }
        }
    }

    public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull byte[] prefixBytes) {
        int prefixLength = prefixBytes.length;
        for (int i = 0; i < prefixLength; ++i) {
            int r = (char)prefixBytes[i] - (char)bytes[i];
            if (r == 0) continue;
            return r > 0;
        }
        return false;
    }

    @Nullable
    public static <T extends KeyedStateHandle> T chooseTheBestStateHandleForInitial(@Nonnull List<T> restoreStateHandles, @Nonnull KeyGroupRange targetKeyGroupRange, double overlapFractionThreshold) {
        int pos = RocksDBIncrementalCheckpointUtils.findTheBestStateHandleForInitial(restoreStateHandles, targetKeyGroupRange, overlapFractionThreshold);
        return (T)(pos >= 0 ? (KeyedStateHandle)restoreStateHandles.get(pos) : null);
    }

    public static <T extends KeyedStateHandle> int findTheBestStateHandleForInitial(@Nonnull List<T> restoreStateHandles, @Nonnull KeyGroupRange targetKeyGroupRange, double overlapFractionThreshold) {
        if (restoreStateHandles.isEmpty()) {
            return -1;
        }
        if (restoreStateHandles.size() == 1) {
            return 0;
        }
        int currentPos = 0;
        int bestHandlePos = 0;
        Score bestScore = Score.MIN;
        for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
            Score handleScore = RocksDBIncrementalCheckpointUtils.stateHandleEvaluator(rawStateHandle, targetKeyGroupRange, overlapFractionThreshold);
            if (handleScore.compareTo(bestScore) > 0) {
                bestHandlePos = currentPos;
                bestScore = handleScore;
            }
            ++currentPos;
        }
        return bestHandlePos;
    }

    public static final class RangeCheckResult {
        private final byte[] proclaimedMinKey;
        private final byte[] proclaimedMaxKey;
        private final byte[] actualMinKey;
        private final byte[] actualMaxKey;
        final boolean leftInRange;
        final boolean rightInRange;
        final int keyGroupPrefixBytes;

        private RangeCheckResult(byte[] proclaimedMinKey, byte[] proclaimedMaxKey, byte[] actualMinKey, byte[] actualMaxKey, int keyGroupPrefixBytes) {
            Comparator comparator = UnsignedBytes.lexicographicalComparator();
            this.proclaimedMinKey = proclaimedMinKey;
            this.proclaimedMaxKey = proclaimedMaxKey;
            this.actualMinKey = actualMinKey;
            this.actualMaxKey = actualMaxKey;
            this.leftInRange = comparator.compare(actualMinKey, proclaimedMinKey) >= 0;
            this.rightInRange = comparator.compare(actualMaxKey, proclaimedMaxKey) < 0;
            this.keyGroupPrefixBytes = keyGroupPrefixBytes;
        }

        public boolean allInRange() {
            return this.leftInRange && this.rightInRange;
        }

        public byte[] getProclaimedMinKey() {
            return this.proclaimedMinKey;
        }

        public byte[] getProclaimedMaxKey() {
            return this.proclaimedMaxKey;
        }

        public byte[] getActualMinKey() {
            return this.actualMinKey;
        }

        public byte[] getActualMaxKey() {
            return this.actualMaxKey;
        }

        public int getKeyGroupPrefixBytes() {
            return this.keyGroupPrefixBytes;
        }

        public boolean isLeftInRange() {
            return this.leftInRange;
        }

        public boolean isRightInRange() {
            return this.rightInRange;
        }

        static RangeCheckResult of(byte[] proclaimedMinKey, byte[] proclaimedMaxKey, byte[] actualMinKey, byte[] actualMaxKey, int keyGroupPrefixBytes) {
            return new RangeCheckResult(proclaimedMinKey, proclaimedMaxKey, actualMinKey, actualMaxKey, keyGroupPrefixBytes);
        }

        public String toString() {
            return "RangeCheckResult{leftInRange=" + this.leftInRange + ", rightInRange=" + this.rightInRange + ", actualMinKeyGroup=" + CompositeKeySerializationUtils.extractKeyGroup((int)this.keyGroupPrefixBytes, (byte[])this.getActualMinKey()) + ", actualMaxKeyGroup=" + CompositeKeySerializationUtils.extractKeyGroup((int)this.keyGroupPrefixBytes, (byte[])this.getActualMaxKey()) + "}";
        }
    }

    private static final class KeyRange {
        static final KeyRange EMPTY = KeyRange.of(new byte[0], new byte[0]);
        final byte[] minKey;
        final byte[] maxKey;

        private KeyRange(byte[] minKey, byte[] maxKey) {
            this.minKey = minKey;
            this.maxKey = maxKey;
        }

        static KeyRange of(byte[] minKey, byte[] maxKey) {
            return new KeyRange(minKey, maxKey);
        }
    }

    private static class Score
    implements Comparable<Score> {
        public static final Score MIN = new Score(Integer.MIN_VALUE, -1.0);
        private final int intersectGroupRange;
        private final double overlapFraction;

        public Score(int intersectGroupRange, double overlapFraction) {
            this.intersectGroupRange = intersectGroupRange;
            this.overlapFraction = overlapFraction;
        }

        public int getIntersectGroupRange() {
            return this.intersectGroupRange;
        }

        public double getOverlapFraction() {
            return this.overlapFraction;
        }

        @Override
        public int compareTo(@Nullable Score other) {
            return Comparator.nullsFirst(Comparator.comparing(Score::getIntersectGroupRange).thenComparing(Score::getOverlapFraction)).compare(this, other);
        }
    }
}

