/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexSpilledRegionManager;
import org.apache.flink.runtime.io.network.partition.hybrid.InternalRegionWriteReadUtils;
import org.apache.flink.util.ExceptionUtils;

public class HsFileDataIndexSpilledRegionManagerImpl
implements HsFileDataIndexSpilledRegionManager {
    private final ByteBuffer regionHeaderBuffer = InternalRegionWriteReadUtils.allocateAndConfigureBuffer(16);
    private final List<TreeMap<Integer, SegmentMeta>> subpartitionFinishedSegmentMetas;
    private FileChannel channel;
    private long nextSegmentOffset = 0L;
    private final long[] subpartitionCurrentOffset;
    private final int[] subpartitionFreeSpaceInBytes;
    private final SegmentMeta[] currentSegmentMeta;
    private final int segmentSizeInBytes;
    private final BiConsumer<Integer, HsFileDataIndexImpl.InternalRegion> cacheRegionConsumer;
    private final boolean loadEntireSegmentToCache;

    public HsFileDataIndexSpilledRegionManagerImpl(int numSubpartitions, Path indexFilePath, int segmentSizeInBytes, long maxCacheCapacity, BiConsumer<Integer, HsFileDataIndexImpl.InternalRegion> cacheRegionConsumer) {
        try {
            this.channel = FileChannel.open(indexFilePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.READ, StandardOpenOption.WRITE);
        }
        catch (IOException e) {
            ExceptionUtils.rethrow((Throwable)e);
        }
        this.loadEntireSegmentToCache = HsFileDataIndexSpilledRegionManagerImpl.shouldLoadEntireSegmentToCache(numSubpartitions, segmentSizeInBytes, maxCacheCapacity);
        this.subpartitionFinishedSegmentMetas = new ArrayList<TreeMap<Integer, SegmentMeta>>(numSubpartitions);
        this.subpartitionCurrentOffset = new long[numSubpartitions];
        this.subpartitionFreeSpaceInBytes = new int[numSubpartitions];
        this.currentSegmentMeta = new SegmentMeta[numSubpartitions];
        for (int i = 0; i < numSubpartitions; ++i) {
            this.subpartitionFinishedSegmentMetas.add(new TreeMap());
        }
        this.cacheRegionConsumer = cacheRegionConsumer;
        this.segmentSizeInBytes = segmentSizeInBytes;
    }

    @Override
    public long findRegion(int subpartition, int bufferIndex, boolean loadToCache) {
        long regionOffset;
        SegmentMeta segmentMeta = this.currentSegmentMeta[subpartition];
        if (segmentMeta != null && (regionOffset = this.findRegionInSegment(subpartition, bufferIndex, segmentMeta, loadToCache)) != -1L) {
            return regionOffset;
        }
        TreeMap<Integer, SegmentMeta> subpartitionSegmentMetaTreeMap = this.subpartitionFinishedSegmentMetas.get(subpartition);
        for (SegmentMeta meta : subpartitionSegmentMetaTreeMap.headMap(bufferIndex, true).values()) {
            long regionOffset2 = this.findRegionInSegment(subpartition, bufferIndex, meta, loadToCache);
            if (regionOffset2 == -1L) continue;
            return regionOffset2;
        }
        return -1L;
    }

    private long findRegionInSegment(int subpartition, int bufferIndex, SegmentMeta meta, boolean loadToCache) {
        if (bufferIndex <= meta.getMaxBufferIndex()) {
            try {
                return this.readSegmentAndLoadToCacheIfNeeded(subpartition, bufferIndex, meta, loadToCache);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return -1L;
    }

    private long readSegmentAndLoadToCacheIfNeeded(int subpartition, int bufferIndex, SegmentMeta meta, boolean loadToCache) throws IOException {
        List<Tuple2<HsFileDataIndexImpl.InternalRegion, Long>> regionAndOffsets = this.readSegment(meta.getOffset(), meta.getNumRegions());
        long targetRegionOffset = -1L;
        HsFileDataIndexImpl.InternalRegion targetRegion = null;
        Iterator<Tuple2<HsFileDataIndexImpl.InternalRegion, Long>> it = regionAndOffsets.iterator();
        while (it.hasNext()) {
            Tuple2<HsFileDataIndexImpl.InternalRegion, Long> regionAndOffset = it.next();
            HsFileDataIndexImpl.InternalRegion region = (HsFileDataIndexImpl.InternalRegion)regionAndOffset.f0;
            if (!region.containBuffer(bufferIndex)) continue;
            targetRegion = region;
            targetRegionOffset = (Long)regionAndOffset.f1;
            it.remove();
        }
        if (targetRegion != null && loadToCache) {
            if (this.loadEntireSegmentToCache) {
                regionAndOffsets.forEach(regionAndOffsetTuple -> this.cacheRegionConsumer.accept(subpartition, (HsFileDataIndexImpl.InternalRegion)regionAndOffsetTuple.f0));
                this.cacheRegionConsumer.accept(subpartition, targetRegion);
            } else {
                this.cacheRegionConsumer.accept(subpartition, targetRegion);
            }
        }
        return targetRegionOffset;
    }

    @Override
    public void appendOrOverwriteRegion(int subpartition, HsFileDataIndexImpl.InternalRegion newRegion) throws IOException {
        long oldRegionOffset = this.findRegion(subpartition, newRegion.getFirstBufferIndex(), false);
        if (oldRegionOffset != -1L) {
            this.writeRegionToOffset(oldRegionOffset, newRegion);
        } else {
            this.appendRegion(subpartition, newRegion);
        }
    }

    @Override
    public void close() throws IOException {
        if (this.channel != null) {
            this.channel.close();
        }
    }

    private static boolean shouldLoadEntireSegmentToCache(int numSubpartitions, int segmentSizeInBytes, long maxCacheCapacity) {
        return 2L * (long)numSubpartitions * (long)segmentSizeInBytes / 16L <= maxCacheCapacity;
    }

    private void appendRegion(int subpartition, HsFileDataIndexImpl.InternalRegion region) throws IOException {
        int regionSize = region.getSize();
        if (this.subpartitionFreeSpaceInBytes[subpartition] < regionSize) {
            this.startNewSegment(subpartition, Math.max(regionSize, this.segmentSizeInBytes));
        }
        this.writeRegionToOffset(this.subpartitionCurrentOffset[subpartition], region);
        this.updateSegment(subpartition, region);
    }

    private void writeRegionToOffset(long offset, HsFileDataIndexImpl.InternalRegion region) throws IOException {
        this.channel.position(offset);
        InternalRegionWriteReadUtils.writeRegionToFile(this.channel, this.regionHeaderBuffer, region);
    }

    private void startNewSegment(int subpartition, int newSegmentSize) {
        SegmentMeta oldSegmentMeta = this.currentSegmentMeta[subpartition];
        this.currentSegmentMeta[subpartition] = new SegmentMeta(this.nextSegmentOffset);
        this.subpartitionCurrentOffset[subpartition] = this.nextSegmentOffset;
        this.nextSegmentOffset += (long)newSegmentSize;
        this.subpartitionFreeSpaceInBytes[subpartition] = newSegmentSize;
        if (oldSegmentMeta != null) {
            this.subpartitionFinishedSegmentMetas.get(subpartition).put(oldSegmentMeta.minBufferIndex, oldSegmentMeta);
        }
    }

    private void updateSegment(int subpartition, HsFileDataIndexImpl.InternalRegion region) {
        int regionSize = region.getSize();
        int n = subpartition;
        this.subpartitionFreeSpaceInBytes[n] = this.subpartitionFreeSpaceInBytes[n] - regionSize;
        int n2 = subpartition;
        this.subpartitionCurrentOffset[n2] = this.subpartitionCurrentOffset[n2] + (long)regionSize;
        SegmentMeta segmentMeta = this.currentSegmentMeta[subpartition];
        segmentMeta.addRegion(region.getFirstBufferIndex(), region.getFirstBufferIndex() + region.getNumBuffers() - 1);
    }

    private List<Tuple2<HsFileDataIndexImpl.InternalRegion, Long>> readSegment(long offset, int numRegions) throws IOException {
        ArrayList<Tuple2<HsFileDataIndexImpl.InternalRegion, Long>> regionAndOffsets = new ArrayList<Tuple2<HsFileDataIndexImpl.InternalRegion, Long>>();
        for (int i = 0; i < numRegions; ++i) {
            HsFileDataIndexImpl.InternalRegion region = InternalRegionWriteReadUtils.readRegionFromFile(this.channel, this.regionHeaderBuffer, offset);
            regionAndOffsets.add((Tuple2<HsFileDataIndexImpl.InternalRegion, Long>)Tuple2.of((Object)region, (Object)offset));
            offset += (long)region.getSize();
        }
        return regionAndOffsets;
    }

    public static class Factory
    implements HsFileDataIndexSpilledRegionManager.Factory {
        private final int segmentSizeInBytes;
        private final long maxCacheCapacity;

        public Factory(int segmentSizeInBytes, long maxCacheCapacity) {
            this.segmentSizeInBytes = segmentSizeInBytes;
            this.maxCacheCapacity = maxCacheCapacity;
        }

        @Override
        public HsFileDataIndexSpilledRegionManager create(int numSubpartitions, Path indexFilePath, BiConsumer<Integer, HsFileDataIndexImpl.InternalRegion> cacheRegionConsumer) {
            return new HsFileDataIndexSpilledRegionManagerImpl(numSubpartitions, indexFilePath, this.segmentSizeInBytes, this.maxCacheCapacity, cacheRegionConsumer);
        }
    }

    private static class SegmentMeta {
        private int minBufferIndex;
        private int maxBufferIndex;
        private int numRegions;
        private final long offset;

        public SegmentMeta(long offset) {
            this.offset = offset;
            this.minBufferIndex = Integer.MAX_VALUE;
            this.maxBufferIndex = 0;
            this.numRegions = 0;
        }

        public int getMaxBufferIndex() {
            return this.maxBufferIndex;
        }

        public long getOffset() {
            return this.offset;
        }

        public int getNumRegions() {
            return this.numRegions;
        }

        public void addRegion(int firstBufferIndexOfRegion, int maxBufferIndexOfRegion) {
            if (firstBufferIndexOfRegion < this.minBufferIndex) {
                this.minBufferIndex = firstBufferIndexOfRegion;
            }
            if (maxBufferIndexOfRegion > this.maxBufferIndex) {
                this.maxBufferIndex = maxBufferIndexOfRegion;
            }
            ++this.numRegions;
        }
    }
}

