/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskCacheManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public class DiskTierProducerAgent
implements TierProducerAgent,
NettyServiceProducer {
    private final TieredStoragePartitionId partitionId;
    private final int numBuffersPerSegment;
    private final int bufferSizeBytes;
    private final Path dataFilePath;
    private final float minReservedDiskSpaceFraction;
    private final TieredStorageMemoryManager memoryManager;
    private final DiskCacheManager diskCacheManager;
    private final List<Map<Integer, Integer>> firstBufferIndexInSegment;
    private final int[] currentSubpartitionWriteBuffers;
    private final DiskIOScheduler diskIOScheduler;
    private final BufferCompressor bufferCompressor;
    private volatile boolean isReleased;

    DiskTierProducerAgent(TieredStoragePartitionId partitionId, int numSubpartitions, int numBytesPerSegment, int bufferSizeBytes, int maxCachedBytesBeforeFlush, Path dataFilePath, float minReservedDiskSpaceFraction, boolean isBroadcastOnly, PartitionFileWriter partitionFileWriter, PartitionFileReader partitionFileReader, TieredStorageMemoryManager memoryManager, TieredStorageNettyService nettyService, TieredStorageResourceRegistry resourceRegistry, BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, int maxRequestedBuffers, Duration bufferRequestTimeout, BufferCompressor bufferCompressor) {
        Preconditions.checkArgument(numBytesPerSegment >= bufferSizeBytes, "One segment should contain at least one buffer.");
        this.partitionId = partitionId;
        this.numBuffersPerSegment = numBytesPerSegment / bufferSizeBytes;
        this.bufferSizeBytes = bufferSizeBytes;
        this.dataFilePath = dataFilePath;
        this.minReservedDiskSpaceFraction = minReservedDiskSpaceFraction;
        this.memoryManager = memoryManager;
        this.firstBufferIndexInSegment = new ArrayList<Map<Integer, Integer>>();
        this.currentSubpartitionWriteBuffers = new int[numSubpartitions];
        this.bufferCompressor = bufferCompressor;
        for (int i = 0; i < numSubpartitions; ++i) {
            this.firstBufferIndexInSegment.add(new ConcurrentHashMap());
        }
        this.diskCacheManager = new DiskCacheManager(partitionId, isBroadcastOnly ? 1 : numSubpartitions, maxCachedBytesBeforeFlush, memoryManager, partitionFileWriter);
        this.diskIOScheduler = new DiskIOScheduler(partitionId, bufferPool, ioExecutor, maxRequestedBuffers, bufferRequestTimeout, this::firstBufferIndexToSegmentId, partitionFileReader);
        nettyService.registerProducer(partitionId, this);
        resourceRegistry.registerResource(partitionId, this::releaseResources);
    }

    @Override
    public boolean tryStartNewSegment(TieredStorageSubpartitionId subpartitionId, int segmentId, int minNumBuffers) {
        boolean canStartNewSegment;
        File filePath = this.dataFilePath.toFile();
        boolean bl = canStartNewSegment = filePath.getUsableSpace() - (long)Math.max(this.numBuffersPerSegment, minNumBuffers) * (long)this.bufferSizeBytes > (long)((float)filePath.getTotalSpace() * this.minReservedDiskSpaceFraction);
        if (canStartNewSegment) {
            this.firstBufferIndexInSegment.get(subpartitionId.getSubpartitionId()).put(this.diskCacheManager.getBufferIndex(subpartitionId.getSubpartitionId()), segmentId);
            this.diskCacheManager.startSegment(subpartitionId.getSubpartitionId(), segmentId);
        }
        return canStartNewSegment;
    }

    @Override
    public boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer, Object bufferOwner, int numRemainingConsecutiveBuffers) {
        int subpartitionIndex = subpartitionId.getSubpartitionId();
        if (this.currentSubpartitionWriteBuffers[subpartitionIndex] != 0 && this.currentSubpartitionWriteBuffers[subpartitionIndex] + 1 + numRemainingConsecutiveBuffers > this.numBuffersPerSegment) {
            this.emitEndOfSegmentEvent(subpartitionIndex);
            this.currentSubpartitionWriteBuffers[subpartitionIndex] = 0;
            return false;
        }
        Buffer compressedBuffer = TieredStorageUtils.compressBufferIfPossible(finishedBuffer, this.bufferCompressor);
        if (compressedBuffer.isBuffer()) {
            this.memoryManager.transferBufferOwnership(bufferOwner, TieredStorageUtils.getDiskTierName(), compressedBuffer);
        }
        int n = subpartitionIndex;
        this.currentSubpartitionWriteBuffers[n] = this.currentSubpartitionWriteBuffers[n] + 1;
        this.emitBuffer(compressedBuffer, subpartitionIndex, numRemainingConsecutiveBuffers == 0);
        return true;
    }

    @Override
    public void connectionEstablished(TieredStorageSubpartitionId subpartitionId, NettyConnectionWriter nettyConnectionWriter) {
        if (!Files.isReadable(this.dataFilePath)) {
            throw new RuntimeException(new PartitionNotFoundException(TieredStorageIdMappingUtils.convertId(this.partitionId)));
        }
        this.diskIOScheduler.connectionEstablished(subpartitionId, nettyConnectionWriter);
    }

    @Override
    public void connectionBroken(NettyConnectionId connectionId) {
        this.diskIOScheduler.connectionBroken(connectionId);
    }

    @Override
    public void close() {
        this.diskCacheManager.close();
    }

    private void emitEndOfSegmentEvent(int subpartitionId) {
        try {
            this.diskCacheManager.appendEndOfSegmentEvent(EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE), subpartitionId);
        }
        catch (IOException e) {
            ExceptionUtils.rethrow(e, "Failed to emit end of segment event.");
        }
    }

    private void emitBuffer(Buffer finishedBuffer, int subpartition, boolean flush) {
        this.diskCacheManager.append(finishedBuffer, subpartition, flush);
    }

    private void releaseResources() {
        if (!this.isReleased) {
            for (Map<Integer, Integer> subFirstBufferIndexInSegment : this.firstBufferIndexInSegment) {
                subFirstBufferIndexInSegment.clear();
            }
            this.diskCacheManager.release();
            this.diskIOScheduler.release();
            this.isReleased = true;
        }
    }

    private Integer firstBufferIndexToSegmentId(int subpartitionId, int bufferIndex) {
        return this.firstBufferIndexInSegment.size() > subpartitionId ? this.firstBufferIndexInSegment.get(subpartitionId).get(bufferIndex) : null;
    }
}

