/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.mergetree;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.LookupStoreWriter;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.LookupFile;
import org.apache.paimon.mergetree.LookupUtils;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BloomFilter;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.IOFunction;
import org.apache.paimon.utils.VarLengthIntUtils;

public class LookupLevels<T>
implements Levels.DropFileCallback,
Closeable {
    private final Levels levels;
    private final Comparator<InternalRow> keyComparator;
    private final RowCompactedSerializer keySerializer;
    private final ValueProcessor<T> valueProcessor;
    private final IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory;
    private final Function<String, File> localFileFactory;
    private final LookupStoreFactory lookupStoreFactory;
    private final Function<Long, BloomFilter.Builder> bfGenerator;
    private final Cache<String, LookupFile> lookupFileCache;
    private final Set<String> ownCachedFiles;

    public LookupLevels(Levels levels, Comparator<InternalRow> keyComparator, RowType keyType, ValueProcessor<T> valueProcessor, IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory, Function<String, File> localFileFactory, LookupStoreFactory lookupStoreFactory, Function<Long, BloomFilter.Builder> bfGenerator, Cache<String, LookupFile> lookupFileCache) {
        this.levels = levels;
        this.keyComparator = keyComparator;
        this.keySerializer = new RowCompactedSerializer(keyType);
        this.valueProcessor = valueProcessor;
        this.fileReaderFactory = fileReaderFactory;
        this.localFileFactory = localFileFactory;
        this.lookupStoreFactory = lookupStoreFactory;
        this.bfGenerator = bfGenerator;
        this.lookupFileCache = lookupFileCache;
        this.ownCachedFiles = new HashSet<String>();
        levels.addDropFileCallback(this);
    }

    public Levels getLevels() {
        return this.levels;
    }

    @VisibleForTesting
    Cache<String, LookupFile> lookupFiles() {
        return this.lookupFileCache;
    }

    @VisibleForTesting
    Set<String> cachedFiles() {
        return this.ownCachedFiles;
    }

    @Override
    public void notifyDropFile(String file) {
        this.lookupFileCache.invalidate(file);
    }

    @Nullable
    public T lookup(InternalRow key, int startLevel) throws IOException {
        return (T)LookupUtils.lookup(this.levels, key, startLevel, this::lookup, this::lookupLevel0);
    }

    @Nullable
    private T lookupLevel0(InternalRow key, TreeSet<DataFileMeta> level0) throws IOException {
        return (T)LookupUtils.lookupLevel0(this.keyComparator, key, level0, this::lookup);
    }

    @Nullable
    private T lookup(InternalRow key, SortedRun level) throws IOException {
        return (T)LookupUtils.lookup(this.keyComparator, key, level, this::lookup);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private T lookup(InternalRow key, DataFileMeta file) throws IOException {
        byte[] valueBytes;
        LookupFile lookupFile = this.lookupFileCache.getIfPresent(file.fileName());
        boolean newCreatedLookupFile = false;
        if (lookupFile == null) {
            lookupFile = this.createLookupFile(file);
            newCreatedLookupFile = true;
        }
        try {
            byte[] keyBytes = this.keySerializer.serializeToBytes(key);
            valueBytes = lookupFile.get(keyBytes);
        }
        finally {
            if (newCreatedLookupFile) {
                this.lookupFileCache.put(file.fileName(), lookupFile);
            }
        }
        if (valueBytes == null) {
            return null;
        }
        return this.valueProcessor.readFromDisk(key, lookupFile.remoteFile().level(), valueBytes, file.fileName());
    }

    private LookupFile createLookupFile(DataFileMeta file) throws IOException {
        LookupStoreFactory.Context context;
        File localFile = this.localFileFactory.apply(file.fileName());
        if (!localFile.createNewFile()) {
            throw new IOException("Can not create new file: " + localFile);
        }
        LookupStoreWriter kvWriter = this.lookupStoreFactory.createWriter(localFile, this.bfGenerator.apply(file.rowCount()));
        try (RecordReader<KeyValue> reader = this.fileReaderFactory.apply(file);){
            if (this.valueProcessor.withPosition()) {
                FileRecordIterator batch;
                while ((batch = (FileRecordIterator)reader.readBatch()) != null) {
                    KeyValue kv;
                    while ((kv = (KeyValue)batch.next()) != null) {
                        byte[] keyBytes = this.keySerializer.serializeToBytes(kv.key());
                        byte[] valueBytes = this.valueProcessor.persistToDisk(kv, batch.returnedPosition());
                        kvWriter.put(keyBytes, valueBytes);
                    }
                    batch.releaseBatch();
                }
            } else {
                RecordReader.RecordIterator<KeyValue> batch;
                while ((batch = reader.readBatch()) != null) {
                    KeyValue kv;
                    while ((kv = batch.next()) != null) {
                        byte[] keyBytes = this.keySerializer.serializeToBytes(kv.key());
                        byte[] valueBytes = this.valueProcessor.persistToDisk(kv);
                        kvWriter.put(keyBytes, valueBytes);
                    }
                    batch.releaseBatch();
                }
            }
        }
        catch (IOException e) {
            FileIOUtils.deleteFileOrDirectory(localFile);
            throw e;
        }
        finally {
            LookupStoreFactory.Context context2 = kvWriter.close();
        }
        this.ownCachedFiles.add(file.fileName());
        return new LookupFile(localFile, file, this.lookupStoreFactory.createReader(localFile, context), () -> this.ownCachedFiles.remove(file.fileName()));
    }

    @Override
    public void close() throws IOException {
        HashSet<String> toClean = new HashSet<String>(this.ownCachedFiles);
        for (String cachedFile : toClean) {
            this.lookupFileCache.invalidate(cachedFile);
        }
    }

    public static class PositionedKeyValue {
        @Nullable
        private final KeyValue keyValue;
        private final String fileName;
        private final long rowPosition;

        public PositionedKeyValue(@Nullable KeyValue keyValue, String fileName, long rowPosition) {
            this.keyValue = keyValue;
            this.fileName = fileName;
            this.rowPosition = rowPosition;
        }

        public String fileName() {
            return this.fileName;
        }

        public long rowPosition() {
            return this.rowPosition;
        }

        @Nullable
        public KeyValue keyValue() {
            return this.keyValue;
        }
    }

    public static class PositionedKeyValueProcessor
    implements ValueProcessor<PositionedKeyValue> {
        private final boolean persistValue;
        private final RowCompactedSerializer valueSerializer;

        public PositionedKeyValueProcessor(RowType valueType, boolean persistValue) {
            this.persistValue = persistValue;
            this.valueSerializer = persistValue ? new RowCompactedSerializer(valueType) : null;
        }

        @Override
        public boolean withPosition() {
            return true;
        }

        @Override
        public byte[] persistToDisk(KeyValue kv) {
            throw new UnsupportedOperationException();
        }

        @Override
        public byte[] persistToDisk(KeyValue kv, long rowPosition) {
            if (this.persistValue) {
                byte[] vBytes = this.valueSerializer.serializeToBytes(kv.value());
                byte[] bytes = new byte[vBytes.length + 8 + 8 + 1];
                MemorySegment segment = MemorySegment.wrap(bytes);
                segment.put(0, vBytes);
                segment.putLong(bytes.length - 17, rowPosition);
                segment.putLong(bytes.length - 9, kv.sequenceNumber());
                segment.put(bytes.length - 1, kv.valueKind().toByteValue());
                return bytes;
            }
            byte[] bytes = new byte[9];
            int len = VarLengthIntUtils.encodeLong(bytes, rowPosition);
            return Arrays.copyOf(bytes, len);
        }

        @Override
        public PositionedKeyValue readFromDisk(InternalRow key, int level, byte[] bytes, String fileName) {
            if (this.persistValue) {
                InternalRow value = this.valueSerializer.deserialize(bytes);
                MemorySegment segment = MemorySegment.wrap(bytes);
                long rowPosition = segment.getLong(bytes.length - 17);
                long sequenceNumber = segment.getLong(bytes.length - 9);
                RowKind rowKind = RowKind.fromByteValue(bytes[bytes.length - 1]);
                return new PositionedKeyValue(new KeyValue().replace(key, sequenceNumber, rowKind, value).setLevel(level), fileName, rowPosition);
            }
            long rowPosition = VarLengthIntUtils.decodeLong(bytes, 0);
            return new PositionedKeyValue(null, fileName, rowPosition);
        }
    }

    public static class ContainsValueProcessor
    implements ValueProcessor<Boolean> {
        private static final byte[] EMPTY_BYTES = new byte[0];

        @Override
        public boolean withPosition() {
            return false;
        }

        @Override
        public byte[] persistToDisk(KeyValue kv) {
            return EMPTY_BYTES;
        }

        @Override
        public Boolean readFromDisk(InternalRow key, int level, byte[] bytes, String fileName) {
            return Boolean.TRUE;
        }
    }

    public static class KeyValueProcessor
    implements ValueProcessor<KeyValue> {
        private final RowCompactedSerializer valueSerializer;

        public KeyValueProcessor(RowType valueType) {
            this.valueSerializer = new RowCompactedSerializer(valueType);
        }

        @Override
        public boolean withPosition() {
            return false;
        }

        @Override
        public byte[] persistToDisk(KeyValue kv) {
            byte[] vBytes = this.valueSerializer.serializeToBytes(kv.value());
            byte[] bytes = new byte[vBytes.length + 8 + 1];
            MemorySegment segment = MemorySegment.wrap(bytes);
            segment.put(0, vBytes);
            segment.putLong(bytes.length - 9, kv.sequenceNumber());
            segment.put(bytes.length - 1, kv.valueKind().toByteValue());
            return bytes;
        }

        @Override
        public KeyValue readFromDisk(InternalRow key, int level, byte[] bytes, String fileName) {
            InternalRow value = this.valueSerializer.deserialize(bytes);
            long sequenceNumber = MemorySegment.wrap(bytes).getLong(bytes.length - 9);
            RowKind rowKind = RowKind.fromByteValue(bytes[bytes.length - 1]);
            return new KeyValue().replace(key, sequenceNumber, rowKind, value).setLevel(level);
        }
    }

    public static interface ValueProcessor<T> {
        public boolean withPosition();

        public byte[] persistToDisk(KeyValue var1);

        default public byte[] persistToDisk(KeyValue kv, long rowPosition) {
            throw new UnsupportedOperationException();
        }

        public T readFromDisk(InternalRow var1, int var2, byte[] var3, String var4);
    }
}

