/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsConsumerId;
import org.apache.flink.runtime.io.network.partition.hybrid.HsDataView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataManager;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFullSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManager;
import org.apache.flink.runtime.io.network.partition.hybrid.HsOutputMetrics;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSelectiveSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionConsumer;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReaderImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

public class HsResultPartition
extends ResultPartition {
    public static final String DATA_FILE_SUFFIX = ".hybrid.data";
    public static final String INDEX_FILE_SUFFIX = ".hybrid.index";
    public static final int BROADCAST_SUBPARTITION = 0;
    private final HsFileDataIndex dataIndex;
    private final HsFileDataManager fileDataManager;
    private final Path dataFilePath;
    private final int networkBufferSize;
    private final HybridShuffleConfiguration hybridShuffleConfiguration;
    private final HsConsumerId[] lastConsumerIds;
    private boolean hasNotifiedEndOfUserRecords;
    @Nullable
    private HsMemoryDataManager memoryDataManager;
    private final boolean isBroadcastOnly;

    public HsResultPartition(String owningTaskName, int partitionIndex, ResultPartitionID partitionId, ResultPartitionType partitionType, int numSubpartitions, int numTargetKeyGroups, BatchShuffleReadBufferPool readBufferPool, ScheduledExecutorService readIOExecutor, ResultPartitionManager partitionManager, String dataFileBashPath, int networkBufferSize, HybridShuffleConfiguration hybridShuffleConfiguration, @Nullable BufferCompressor bufferCompressor, boolean isBroadcastOnly, SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
        super(owningTaskName, partitionIndex, partitionId, partitionType, numSubpartitions, numTargetKeyGroups, partitionManager, bufferCompressor, bufferPoolFactory);
        this.networkBufferSize = networkBufferSize;
        this.dataFilePath = new File(dataFileBashPath + DATA_FILE_SUFFIX).toPath();
        this.dataIndex = new HsFileDataIndexImpl(isBroadcastOnly ? 1 : numSubpartitions, new File(dataFileBashPath + INDEX_FILE_SUFFIX).toPath(), hybridShuffleConfiguration.getRegionGroupSizeInBytes(), hybridShuffleConfiguration.getNumRetainedInMemoryRegionsMax());
        this.hybridShuffleConfiguration = hybridShuffleConfiguration;
        this.isBroadcastOnly = isBroadcastOnly;
        this.fileDataManager = new HsFileDataManager(readBufferPool, readIOExecutor, this.dataIndex, this.dataFilePath, HsSubpartitionFileReaderImpl.Factory.INSTANCE, hybridShuffleConfiguration);
        this.lastConsumerIds = new HsConsumerId[numSubpartitions];
    }

    @Override
    protected void setupInternal() throws IOException {
        if (this.isReleased()) {
            throw new IOException("Result partition has been released.");
        }
        this.fileDataManager.setup();
        this.memoryDataManager = new HsMemoryDataManager(this.isBroadcastOnly ? 1 : this.numSubpartitions, this.networkBufferSize, this.bufferPool, this.getSpillingStrategy(this.hybridShuffleConfiguration), this.dataIndex, this.dataFilePath, this.bufferCompressor, this.hybridShuffleConfiguration.getBufferPoolSizeCheckIntervalMs());
    }

    @Override
    public void setMetricGroup(TaskIOMetricGroup metrics) {
        super.setMetricGroup(metrics);
        Preconditions.checkNotNull(this.memoryDataManager).setOutputMetrics(new HsOutputMetrics(this.numBytesOut, this.numBuffersOut, metrics.getHardBackPressuredTimePerSecond()));
    }

    @Override
    public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
        this.resultPartitionBytes.inc(targetSubpartition, record.remaining());
        this.emit(record, targetSubpartition, Buffer.DataType.DATA_BUFFER);
    }

    @Override
    public void broadcastRecord(ByteBuffer record) throws IOException {
        this.broadcast(record, Buffer.DataType.DATA_BUFFER);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
        Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
        try {
            ByteBuffer serializedEvent = buffer.getNioBufferReadable();
            this.broadcast(serializedEvent, buffer.getDataType());
        }
        finally {
            buffer.recycleBuffer();
        }
    }

    private void broadcast(ByteBuffer record, Buffer.DataType dataType) throws IOException {
        this.resultPartitionBytes.incAll(record.remaining());
        if (this.isBroadcastOnly) {
            this.emit(record, 0, dataType);
        } else {
            for (int i = 0; i < this.numSubpartitions; ++i) {
                this.emit(record.duplicate(), i, dataType);
            }
        }
    }

    private void emit(ByteBuffer record, int targetSubpartition, Buffer.DataType dataType) throws IOException {
        this.checkInProduceState();
        Preconditions.checkNotNull(this.memoryDataManager).append(record, targetSubpartition, dataType);
    }

    @Override
    protected ResultSubpartitionView createSubpartitionView(int subpartitionId, BufferAvailabilityListener availabilityListener) throws IOException {
        HsConsumerId consumerId;
        Preconditions.checkState(!this.isReleased(), "ResultPartition already released.");
        if (!Files.isReadable(this.dataFilePath)) {
            throw new PartitionNotFoundException(this.getPartitionId());
        }
        subpartitionId = this.isBroadcastOnly ? 0 : subpartitionId;
        HsSubpartitionConsumer subpartitionConsumer = new HsSubpartitionConsumer(availabilityListener);
        HsConsumerId lastConsumerId = this.lastConsumerIds[subpartitionId];
        this.checkMultipleConsumerIsAllowed(lastConsumerId, this.hybridShuffleConfiguration);
        this.lastConsumerIds[subpartitionId] = consumerId = HsConsumerId.newId(lastConsumerId);
        HsDataView diskDataView = this.fileDataManager.registerNewConsumer(subpartitionId, consumerId, subpartitionConsumer);
        HsDataView memoryDataView = Preconditions.checkNotNull(this.memoryDataManager).registerNewConsumer(subpartitionId, consumerId, subpartitionConsumer);
        subpartitionConsumer.setDiskDataView(diskDataView);
        subpartitionConsumer.setMemoryDataView(memoryDataView);
        return subpartitionConsumer;
    }

    @Override
    public void alignedBarrierTimeout(long checkpointId) throws IOException {
    }

    @Override
    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
    }

    @Override
    public void flushAll() {
    }

    @Override
    public void flush(int subpartitionIndex) {
    }

    @Override
    public void finish() throws IOException {
        this.broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
        Preconditions.checkState(!this.isReleased(), "Result partition is already released.");
        super.finish();
    }

    @Override
    public void close() {
        Preconditions.checkNotNull(this.memoryDataManager).close();
        super.close();
    }

    @Override
    protected void releaseInternal() {
        this.fileDataManager.release();
    }

    @Override
    public int getNumberOfQueuedBuffers() {
        return 0;
    }

    @Override
    public long getSizeOfQueuedBuffersUnsafe() {
        return 0L;
    }

    @Override
    public int getNumberOfQueuedBuffers(int targetSubpartition) {
        return 0;
    }

    @Override
    public void notifyEndOfData(StopMode mode) throws IOException {
        if (!this.hasNotifiedEndOfUserRecords) {
            this.broadcastEvent(new EndOfData(mode), false);
            this.hasNotifiedEndOfUserRecords = true;
        }
    }

    private HsSpillingStrategy getSpillingStrategy(HybridShuffleConfiguration hybridShuffleConfiguration) {
        switch (hybridShuffleConfiguration.getSpillingStrategyType()) {
            case FULL: {
                return new HsFullSpillingStrategy(hybridShuffleConfiguration);
            }
            case SELECTIVE: {
                return new HsSelectiveSpillingStrategy(hybridShuffleConfiguration);
            }
        }
        throw new IllegalConfigurationException("Illegal spilling strategy.");
    }

    private void checkMultipleConsumerIsAllowed(HsConsumerId lastConsumerId, HybridShuffleConfiguration hybridShuffleConfiguration) {
        if (hybridShuffleConfiguration.getSpillingStrategyType() == HybridShuffleConfiguration.SpillingStrategyType.SELECTIVE) {
            Preconditions.checkState(lastConsumerId == null, "Multiple consumer is not allowed for %s spilling strategy mode", new Object[]{hybridShuffleConfiguration.getSpillingStrategyType()});
        }
    }
}

