/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferHeader;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile;
import org.apache.flink.util.ExceptionUtils;

public class SegmentPartitionFileReader
implements PartitionFileReader {
    private final ByteBuffer reusedHeaderBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Tuple2<ReadableByteChannel, Integer>>> openedChannelAndSegmentIds = new HashMap<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Tuple2<ReadableByteChannel, Integer>>>();
    private final String dataFilePath;
    private FileSystem fileSystem;

    public SegmentPartitionFileReader(String dataFilePath) {
        this.dataFilePath = dataFilePath;
        try {
            this.fileSystem = new Path(dataFilePath).getFileSystem();
        }
        catch (IOException e) {
            ExceptionUtils.rethrow((Throwable)e, (String)"Failed to initialize the FileSystem.");
        }
    }

    @Override
    public PartitionFileReader.ReadBufferResult readBuffer(TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, int bufferIndex, MemorySegment memorySegment, BufferRecycler recycler, @Nullable PartitionFileReader.ReadProgress readProgress, @Nullable CompositeBuffer partialBuffer) throws IOException {
        Map subpartitionInfo = this.openedChannelAndSegmentIds.computeIfAbsent(partitionId, ignore -> new HashMap());
        Tuple2 fileChannelAndSegmentId = subpartitionInfo.getOrDefault(subpartitionId, Tuple2.of(null, (Object)-1));
        ReadableByteChannel channel = (ReadableByteChannel)fileChannelAndSegmentId.f0;
        if (channel == null || (Integer)fileChannelAndSegmentId.f1 != segmentId) {
            if (channel != null) {
                channel.close();
            }
            if ((channel = this.openNewChannel(partitionId, subpartitionId, segmentId)) == null) {
                return null;
            }
            subpartitionInfo.put(subpartitionId, Tuple2.of((Object)channel, (Object)segmentId));
        }
        this.reusedHeaderBuffer.clear();
        int bufferHeaderResult = channel.read(this.reusedHeaderBuffer);
        if (bufferHeaderResult == -1) {
            channel.close();
            this.openedChannelAndSegmentIds.get(partitionId).remove(subpartitionId);
            return SegmentPartitionFileReader.getSingletonReadResult(new NetworkBuffer(memorySegment, recycler, Buffer.DataType.END_OF_SEGMENT));
        }
        this.reusedHeaderBuffer.flip();
        BufferHeader header = BufferReaderWriterUtil.parseBufferHeader(this.reusedHeaderBuffer);
        int dataBufferResult = channel.read(memorySegment.wrap(0, header.getLength()));
        if (dataBufferResult != header.getLength()) {
            channel.close();
            throw new IOException("The length of data buffer is illegal.");
        }
        Buffer.DataType dataType = header.getDataType();
        return SegmentPartitionFileReader.getSingletonReadResult(new NetworkBuffer(memorySegment, recycler, dataType, header.isCompressed(), header.getLength()));
    }

    @Override
    public long getPriority(TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, int bufferIndex, @Nullable PartitionFileReader.ReadProgress readProgress) {
        return -1L;
    }

    private ReadableByteChannel openNewChannel(TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId) throws IOException {
        Path currentSegmentPath = SegmentPartitionFile.getSegmentPath(this.dataFilePath, partitionId, subpartitionId.getSubpartitionId(), segmentId);
        if (!this.fileSystem.exists(currentSegmentPath)) {
            return null;
        }
        return Channels.newChannel((InputStream)this.fileSystem.open(currentSegmentPath));
    }

    @Override
    public void release() {
        this.openedChannelAndSegmentIds.values().stream().map(Map::values).flatMap(Collection::stream).filter(Objects::nonNull).forEach(channel -> {
            try {
                ((ReadableByteChannel)channel.f0).close();
            }
            catch (IOException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
        });
    }

    private static PartitionFileReader.ReadBufferResult getSingletonReadResult(NetworkBuffer buffer) {
        return new PartitionFileReader.ReadBufferResult(Collections.singletonList(buffer), false, null);
    }
}

