/*
 * Decompiled with CFR 0.152.
 */
package com.indeed.lsmtree.recordlog;

import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.io.ByteStreams;
import com.google.common.io.CountingInputStream;
import com.indeed.lsmtree.recordlog.MemoryRandomAccessDataInput;
import com.indeed.lsmtree.recordlog.RecordFile;
import com.indeed.util.compress.BlockDecompressorStream;
import com.indeed.util.compress.CompressionCodec;
import com.indeed.util.compress.Decompressor;
import com.indeed.util.compress.SnappyCodec;
import com.indeed.util.core.Either;
import com.indeed.util.core.io.Closeables2;
import com.indeed.util.core.reference.SharedReference;
import com.indeed.util.io.BufferedFileDataOutputStream;
import com.indeed.util.io.RandomAccessDataInput;
import com.indeed.util.io.SyncableDataOutput;
import com.indeed.util.io.UnsafeByteArrayOutputStream;
import com.indeed.util.io.VIntUtils;
import com.indeed.util.mmap.DirectMemory;
import com.indeed.util.mmap.HeapMemory;
import com.indeed.util.mmap.MMapBuffer;
import com.indeed.util.mmap.Memory;
import com.indeed.util.mmap.MemoryDataInput;
import com.indeed.util.serialization.Serializer;
import fj.data.Option;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import javax.annotation.Nullable;
import org.apache.log4j.Logger;

public final class BlockCompressedRecordFile<E>
implements RecordFile<E> {
    private static final Logger log = Logger.getLogger(BlockCompressedRecordFile.class);
    private final String file;
    private final Serializer<E> serializer;
    private final CompressionCodec codec;
    private final int blockSize;
    private final int padBits;
    private final Supplier<? extends Either<IOException, ? extends RandomAccessDataInput>> inputSupplier;
    private final BlockCache blockCache;
    private final int shift;
    private final long mask;
    private final int pad;
    private final long padMask;
    private final int maxChunkSize;
    private final SharedReference<Closeable> closeableRef;
    private static final AtomicLong openFileCounter = new AtomicLong(0L);

    public static long getOpenFileCount() {
        return openFileCounter.get();
    }

    public static <E> BlockCompressedRecordFile<E> open(File file, Serializer<E> serializer, CompressionCodec codec, BlockingQueue<Decompressor> decompressorPool, int blockSize, int recordIndexBits, int padBits, boolean mlockFiles, int maxChunkSize) throws IOException {
        final MMapBuffer buffer = new MMapBuffer(file, FileChannel.MapMode.READ_ONLY, ByteOrder.BIG_ENDIAN);
        try {
            if (mlockFiles) {
                buffer.mlock(0L, buffer.memory().length());
            }
            DirectMemory memory = buffer.memory();
            openFileCounter.incrementAndGet();
            return new BlockCompressedRecordFile<E>((Supplier<? extends Either<IOException, ? extends RandomAccessDataInput>>)new Supplier<Either<IOException, MemoryRandomAccessDataInput>>((Memory)memory){
                final /* synthetic */ Memory val$memory;
                {
                    this.val$memory = memory;
                }

                public Either<IOException, MemoryRandomAccessDataInput> get() {
                    return Either.Right.of((Object)new MemoryRandomAccessDataInput(this.val$memory));
                }
            }, new Closeable(){

                @Override
                public void close() throws IOException {
                    openFileCounter.decrementAndGet();
                    buffer.close();
                }
            }, file.getAbsolutePath(), serializer, codec, decompressorPool, blockSize, recordIndexBits, padBits, maxChunkSize);
        }
        catch (Throwable t) {
            Closeables2.closeQuietly((Closeable)buffer, (Logger)log);
            Throwables.propagateIfInstanceOf((Throwable)t, IOException.class);
            throw Throwables.propagate((Throwable)t);
        }
    }

    @Nullable
    public static byte[] getMetadata(File file) throws IOException {
        long length = file.length();
        MMapBuffer buffer = new MMapBuffer(file, 0L, length, FileChannel.MapMode.READ_ONLY, ByteOrder.BIG_ENDIAN);
        DirectMemory memory = buffer.memory();
        int metadataLength = memory.getInt(length - 12L);
        if (metadataLength == Integer.MAX_VALUE) {
            return null;
        }
        byte[] metadata = new byte[metadataLength];
        memory.getBytes(length - 12L - (long)metadataLength, metadata);
        return metadata;
    }

    public BlockCompressedRecordFile(Supplier<? extends Either<IOException, ? extends RandomAccessDataInput>> inputSupplier, Closeable closeable, String file, Serializer<E> serializer, CompressionCodec codec, BlockingQueue<Decompressor> decompressorPool, int blockSize, int recordIndexBits, int padBits, int maxChunkSize) throws IOException {
        this.inputSupplier = inputSupplier;
        this.file = file;
        this.serializer = serializer;
        this.codec = codec;
        this.blockSize = blockSize;
        this.padBits = padBits;
        this.maxChunkSize = maxChunkSize;
        this.pad = 1 << padBits;
        this.padMask = (long)(this.pad - 1) ^ 0xFFFFFFFFFFFFFFFFL;
        this.shift = Math.max(recordIndexBits - padBits, 0);
        this.mask = (1L << recordIndexBits) - 1L;
        this.closeableRef = SharedReference.create((Closeable)closeable);
        try {
            this.blockCache = new BlockCache(decompressorPool);
        }
        catch (Throwable t) {
            Closeables2.closeQuietly(this.closeableRef, (Logger)log);
            Throwables.propagateIfInstanceOf((Throwable)t, IOException.class);
            throw Throwables.propagate((Throwable)t);
        }
    }

    @Override
    public E get(long address) throws IOException {
        long blockAddress = address >>> this.shift & this.padMask;
        Option blockOption = (Option)this.blockCache.get(blockAddress).get();
        if (blockOption.isNone()) {
            throw new IOException("illegal address " + address + " in file " + this.file);
        }
        int recordIndex = (int)(address & this.mask);
        BlockCacheEntry block = (BlockCacheEntry)blockOption.some();
        if (recordIndex >= block.size()) {
            throw new IOException("there are only " + block.size() + " in block at address " + blockAddress + ", seek request is for record number " + recordIndex);
        }
        return (E)this.serializer.read((DataInput)new MemoryDataInput(block.get(recordIndex)));
    }

    @Override
    public RecordFile.Reader<E> reader() throws IOException {
        return new Reader((SharedReference<Closeable>)this.closeableRef.copy());
    }

    @Override
    public RecordFile.Reader<E> reader(long address) throws IOException {
        return new Reader((SharedReference<Closeable>)this.closeableRef.copy(), address);
    }

    @Override
    public void close() throws IOException {
        this.closeableRef.close();
    }

    public static final class Builder<E> {
        private File file;
        private Serializer<E> serializer;
        private CompressionCodec codec;
        private BlockingQueue<Decompressor> decompressorPool = null;
        private int blockSize = 16384;
        private int recordIndexBits = 10;
        private int padBits = 6;
        private boolean mlockFiles = false;
        private int maxChunkSize = 0x8000000;

        public Builder(File file, Serializer<E> serializer, CompressionCodec codec) {
            this.file = file;
            this.serializer = serializer;
            this.codec = codec;
        }

        public Builder<E> setFile(File file) {
            this.file = file;
            return this;
        }

        public Builder<E> setSerializer(Serializer<E> serializer) {
            this.serializer = serializer;
            return this;
        }

        public Builder<E> setCodec(CompressionCodec codec) {
            this.codec = codec;
            return this;
        }

        public Builder<E> setDecompressorPool(BlockingQueue<Decompressor> decompressorPool) {
            this.decompressorPool = decompressorPool;
            return this;
        }

        public Builder<E> setBlockSize(int blockSize) {
            this.blockSize = blockSize;
            return this;
        }

        public Builder<E> setRecordIndexBits(int recordIndexBits) {
            this.recordIndexBits = recordIndexBits;
            return this;
        }

        public Builder<E> setPadBits(int padBits) {
            this.padBits = padBits;
            return this;
        }

        public Builder<E> setMlockFiles(boolean mlockFiles) {
            this.mlockFiles = mlockFiles;
            return this;
        }

        public Builder<E> setMaxChunkSize(int maxChunkSize) {
            this.maxChunkSize = maxChunkSize;
            return this;
        }

        public BlockCompressedRecordFile<E> build() throws IOException {
            this.decompressorPool = this.decompressorPool == null ? new LinkedBlockingQueue() : this.decompressorPool;
            return BlockCompressedRecordFile.open(this.file, this.serializer, this.codec, this.decompressorPool, this.blockSize, this.recordIndexBits, this.padBits, this.mlockFiles, this.maxChunkSize);
        }
    }

    private static final class BlockCacheEntry {
        private final int[] recordOffsets;
        private final Memory block;
        private final long nextBlockStartAddress;

        public BlockCacheEntry(int[] recordOffsets, Memory block, long nextBlockStartAddress) {
            this.recordOffsets = recordOffsets;
            this.block = block;
            this.nextBlockStartAddress = nextBlockStartAddress;
        }

        public int size() {
            return this.recordOffsets.length - 1;
        }

        public Memory get(int index) {
            int length = this.recordOffsets[index + 1] - this.recordOffsets[index];
            return this.block.slice((long)this.recordOffsets[index], (long)length);
        }

        public long getNextBlockStartAddress() {
            return this.nextBlockStartAddress;
        }
    }

    private final class BlockCache {
        private final LoadingCache<Long, Either<IOException, Option<BlockCacheEntry>>> cache;
        private final BlockingQueue<Decompressor> decompressorPool;
        private final CacheLoader<Long, Either<IOException, Option<BlockCacheEntry>>> readBlock = new CacheLoader<Long, Either<IOException, Option<BlockCacheEntry>>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public Either<IOException, Option<BlockCacheEntry>> load(Long blockAddress) {
                Either input = (Either)BlockCompressedRecordFile.this.inputSupplier.get();
                RandomAccessDataInput in = null;
                try {
                    in = (RandomAccessDataInput)input.get();
                    in.seek(blockAddress.longValue());
                    int blockLength = in.readInt();
                    if (blockLength == Integer.MAX_VALUE) {
                        Either either = Either.Right.of((Object)Option.none());
                        return either;
                    }
                    if (blockLength < 4) {
                        throw new IOException("block length for block at address " + blockAddress + " in file " + BlockCompressedRecordFile.this.file + " is " + blockLength + " which is less than 4. this is not possible. this address is probably no good.");
                    }
                    long blockEnd = (blockAddress + 8L + (long)blockLength - 1L >>> BlockCompressedRecordFile.this.padBits) + 1L << BlockCompressedRecordFile.this.padBits;
                    long maxAddress = in.length() - 12L;
                    if (blockLength < 0 || blockEnd > maxAddress) {
                        throw new IOException("block address " + blockAddress + " in file " + BlockCompressedRecordFile.this.file + " is no good, length is " + blockLength + " and end of data is " + maxAddress);
                    }
                    int checksum = in.readInt();
                    if (BlockCompressedRecordFile.this.maxChunkSize > 0) {
                        int chunkLength = in.readInt();
                        if (chunkLength > BlockCompressedRecordFile.this.maxChunkSize) {
                            throw new IOException("first chunk length (" + chunkLength + ") for block at address " + blockAddress + " in file " + BlockCompressedRecordFile.this.file + " is greater than " + BlockCompressedRecordFile.this.maxChunkSize + ". while this may be correct it is extremely unlikely and this is probably a bad address.");
                        }
                        in.seek(blockAddress + 8L);
                    }
                    byte[] compressedBytes = new byte[blockLength];
                    in.readFully(compressedBytes);
                    int padLength = (int)((long)BlockCompressedRecordFile.this.pad - in.position() % (long)BlockCompressedRecordFile.this.pad);
                    if (padLength != BlockCompressedRecordFile.this.pad) {
                        in.seek(in.position() + (long)padLength);
                    }
                    CheckedInputStream checksumStream = new CheckedInputStream(new ByteArrayInputStream(compressedBytes), new Adler32());
                    Decompressor decompressor = (Decompressor)BlockCache.this.decompressorPool.poll();
                    if (decompressor == null) {
                        decompressor = BlockCompressedRecordFile.this.codec.createDecompressor();
                    }
                    decompressor.reset();
                    BlockDecompressorStream decompressed = new BlockDecompressorStream((InputStream)checksumStream, decompressor, BlockCompressedRecordFile.this.blockSize * 2);
                    ByteArrayOutputStream decompressedByteStream = new ByteArrayOutputStream(BlockCompressedRecordFile.this.blockSize * 2);
                    ByteStreams.copy((InputStream)decompressed, (OutputStream)decompressedByteStream);
                    decompressed.close();
                    decompressedByteStream.close();
                    BlockCache.this.decompressorPool.offer(decompressor);
                    if ((int)checksumStream.getChecksum().getValue() != checksum) {
                        throw new IOException("checksum for chunk at block address " + blockAddress + " does not match data");
                    }
                    byte[] decompressedBytes = decompressedByteStream.toByteArray();
                    CountingInputStream counter = new CountingInputStream((InputStream)new ByteArrayInputStream(decompressedBytes));
                    DataInputStream dataInput = new DataInputStream((InputStream)counter);
                    int numRecords = dataInput.readInt();
                    int[] recordOffsets = new int[numRecords + 1];
                    int sum = 0;
                    for (int i = 0; i < numRecords; ++i) {
                        recordOffsets[i] = sum;
                        int delta = VIntUtils.readVInt((InputStream)dataInput);
                        sum += delta;
                    }
                    recordOffsets[numRecords] = sum;
                    int count = (int)counter.getCount();
                    dataInput.close();
                    byte[] block = new byte[decompressedBytes.length - count];
                    System.arraycopy(decompressedBytes, count, block, 0, block.length);
                    Either either = Either.Right.of((Object)Option.some((Object)new BlockCacheEntry(recordOffsets, (Memory)new HeapMemory(block, ByteOrder.BIG_ENDIAN), in.position())));
                    return either;
                }
                catch (IOException e) {
                    log.info((Object)("error reading block at address " + blockAddress + " in file " + BlockCompressedRecordFile.this.file), (Throwable)e);
                    Either either = Either.Left.of((Throwable)e);
                    return either;
                }
                finally {
                    Closeables2.closeQuietly((Closeable)in, (Logger)log);
                }
            }
        };

        public BlockCache(BlockingQueue<Decompressor> decompressorPool) throws IOException {
            this.decompressorPool = decompressorPool;
            this.cache = CacheBuilder.newBuilder().weakValues().build(this.readBlock);
        }

        public Either<IOException, Option<BlockCacheEntry>> get(Long key) {
            return (Either)this.cache.getUnchecked((Object)key);
        }
    }

    private final class Reader
    implements RecordFile.Reader<E> {
        private long position;
        private E current;
        private int currentRecord = 0;
        private Option<BlockCacheEntry> currentBlock;
        private long blockAddress = 0L;
        private boolean done = false;
        private boolean initialized = false;
        private final SharedReference<Closeable> ref;

        public Reader(SharedReference<Closeable> ref) throws IOException {
            this.ref = ref;
        }

        public Reader(SharedReference<Closeable> ref, long seekAddress) throws IOException {
            this.ref = ref;
            this.initialized = true;
            long newBlockAddress = seekAddress >>> BlockCompressedRecordFile.this.shift & BlockCompressedRecordFile.this.padMask;
            this.currentBlock = (Option)BlockCompressedRecordFile.this.blockCache.get(newBlockAddress).get();
            this.blockAddress = newBlockAddress;
            if (this.currentBlock.isNone()) {
                this.done = true;
                throw new IOException("address " + seekAddress + " is invalid because block does not exist in file " + BlockCompressedRecordFile.this.file);
            }
            BlockCacheEntry block = (BlockCacheEntry)this.currentBlock.some();
            this.currentRecord = (int)(seekAddress & BlockCompressedRecordFile.this.mask);
            if (this.currentRecord >= block.size()) {
                this.done = true;
                throw new IOException("there are only " + block.size() + " in block at address " + newBlockAddress + ", seek request is for record number " + this.currentRecord);
            }
        }

        @Override
        public boolean next() throws IOException {
            if (!this.initialized) {
                this.currentBlock = (Option)BlockCompressedRecordFile.this.blockCache.get(0L).get();
                if (this.currentBlock.isNone()) {
                    this.done = true;
                }
                this.initialized = true;
            }
            if (this.done) {
                return false;
            }
            BlockCacheEntry block = (BlockCacheEntry)this.currentBlock.some();
            if (this.currentRecord == block.size()) {
                this.blockAddress = block.getNextBlockStartAddress();
                this.currentBlock = (Option)BlockCompressedRecordFile.this.blockCache.get(this.blockAddress).get();
                this.currentRecord = 0;
                if (this.currentBlock.isNone()) {
                    this.done = true;
                    return false;
                }
                block = (BlockCacheEntry)this.currentBlock.some();
            }
            this.position = (this.blockAddress << BlockCompressedRecordFile.this.shift) + (long)this.currentRecord;
            this.current = BlockCompressedRecordFile.this.serializer.read((DataInput)new MemoryDataInput(block.get(this.currentRecord)));
            ++this.currentRecord;
            return true;
        }

        @Override
        public long getPosition() {
            return this.position;
        }

        @Override
        public E get() {
            return this.current;
        }

        @Override
        public void close() throws IOException {
            this.ref.close();
        }
    }

    public static final class Writer<E>
    implements RecordFile.Writer<E> {
        private final SyncableDataOutput out;
        private final int[] lengthBuffer;
        private final UnsafeByteArrayOutputStream currentBlockBytes;
        private final DataOutputStream currentBlockOut;
        private int numRecords = 0;
        private long blockAddress = 0L;
        private final Serializer<E> serializer;
        private final CompressionCodec codec;
        private final int blockSize;
        private final int shift;
        private final int pad;

        public static <E> Writer<E> open(File file, Serializer<E> serializer, CompressionCodec codec, int blockSize, int recordIndexBits, int padBits) throws FileNotFoundException {
            BufferedFileDataOutputStream out = new BufferedFileDataOutputStream(file, ByteOrder.BIG_ENDIAN, 16384);
            return new Writer<E>((SyncableDataOutput)out, serializer, codec, blockSize, recordIndexBits, padBits);
        }

        public Writer(SyncableDataOutput out, Serializer<E> serializer, CompressionCodec codec, int blockSize, int recordIndexBits, int padBits) {
            if (blockSize > 0x1000000) {
                throw new IllegalArgumentException("block size must be less than 2^24");
            }
            this.out = out;
            this.lengthBuffer = new int[1 << recordIndexBits];
            this.currentBlockBytes = new UnsafeByteArrayOutputStream(blockSize);
            this.currentBlockOut = new DataOutputStream((OutputStream)this.currentBlockBytes);
            this.pad = 1 << padBits;
            this.shift = Math.max(recordIndexBits - padBits, 0);
            this.serializer = serializer;
            this.codec = codec;
            this.blockSize = blockSize;
        }

        @Override
        public long append(E entry) throws IOException {
            int length;
            if (this.currentBlockBytes.size() >= this.blockSize && this.numRecords > 0 || this.numRecords == this.lengthBuffer.length) {
                this.flushBuffer();
            }
            int start = this.currentBlockBytes.size();
            this.serializer.write(entry, (DataOutput)this.currentBlockOut);
            this.lengthBuffer[this.numRecords] = length = this.currentBlockBytes.size() - start;
            long ret = this.blockAddress + (long)this.numRecords;
            ++this.numRecords;
            return ret;
        }

        private void flushBuffer() throws IOException {
            UnsafeByteArrayOutputStream compressedBuffer = new UnsafeByteArrayOutputStream(this.blockSize + 4 * this.numRecords);
            CheckedOutputStream checksumStream = new CheckedOutputStream((OutputStream)compressedBuffer, new Adler32());
            DataOutputStream compressorStream = new DataOutputStream((OutputStream)this.codec.createOutputStream((OutputStream)checksumStream));
            compressorStream.writeInt(this.numRecords);
            for (int i = 0; i < this.numRecords; ++i) {
                VIntUtils.writeVInt((OutputStream)compressorStream, (int)this.lengthBuffer[i]);
            }
            compressorStream.write(this.currentBlockBytes.getByteArray(), 0, this.currentBlockBytes.size());
            compressorStream.close();
            this.out.writeInt(compressedBuffer.size());
            int checksum = (int)checksumStream.getChecksum().getValue();
            this.out.writeInt(checksum);
            this.out.write(compressedBuffer.getByteArray(), 0, compressedBuffer.size());
            this.currentBlockBytes.reset();
            this.numRecords = 0;
            int padLength = (int)((long)this.pad - this.out.position() % (long)this.pad);
            if (padLength != this.pad) {
                for (int i = 0; i < padLength; ++i) {
                    this.out.writeByte(0);
                }
            }
            this.blockAddress = this.out.position() << this.shift;
        }

        @Override
        public void close() throws IOException {
            if (this.numRecords > 0) {
                this.flushBuffer();
            }
            this.out.writeInt(Integer.MAX_VALUE);
            this.out.writeLong(this.out.position() + 8L);
            this.out.sync();
            this.out.close();
        }

        public void close(byte[] metadata) throws IOException {
            if (this.numRecords > 0) {
                this.flushBuffer();
            }
            this.out.writeInt(Integer.MAX_VALUE);
            this.out.write(metadata);
            this.out.writeInt(metadata.length);
            this.out.writeLong(this.out.position() + 8L);
            this.out.sync();
            this.out.close();
        }

        public void sync() throws IOException {
            if (this.numRecords > 0) {
                this.flushBuffer();
            }
            this.out.sync();
        }

        public static final class Builder<E> {
            private final File file;
            private final Serializer<E> serializer;
            private CompressionCodec codec;
            private int blockSize = 16384;
            private int recordIndexBits = 10;
            private int padBits = 6;

            public Builder(File file, Serializer<E> serializer) {
                this.file = file;
                this.serializer = serializer;
            }

            public void setCodec(CompressionCodec codec) {
                this.codec = codec;
            }

            public void setBlockSize(int blockSize) {
                this.blockSize = blockSize;
            }

            public void setRecordIndexBits(int recordIndexBits) {
                this.recordIndexBits = recordIndexBits;
            }

            public void setPadBits(int padBits) {
                this.padBits = padBits;
            }

            public Writer<E> build() throws IOException {
                if (this.codec == null) {
                    this.codec = new SnappyCodec();
                }
                return Writer.open(this.file, this.serializer, this.codec, this.blockSize, this.recordIndexBits, this.padBits);
            }
        }
    }
}

