/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWithSubpartition;
import org.apache.flink.runtime.io.network.partition.DataBuffer;
import org.apache.flink.runtime.io.network.partition.HashBasedDataBuffer;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
import org.apache.flink.runtime.io.network.partition.PartitionedFileWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SortBasedDataBuffer;
import org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionReadScheduler;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

@NotThreadSafe
public class SortMergeResultPartition
extends ResultPartition {
    private static final int NUM_WRITE_BUFFER_BYTES = 0x800000;
    private static final int EXPECTED_WRITE_BATCH_SIZE = 512;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private PartitionedFile resultFile;
    private boolean hasNotifiedEndOfUserRecords;
    private final int networkBufferSize;
    @GuardedBy(value="lock")
    private PartitionedFileWriter fileWriter;
    private final String resultFileBasePath;
    private final int[] subpartitionOrder;
    private final BatchShuffleReadBufferPool readBufferPool;
    private final SortMergeResultPartitionReadScheduler readScheduler;
    private final LinkedList<MemorySegment> freeSegments = new LinkedList();
    private int numBuffersForSort;
    private boolean useHashBuffer;
    private DataBuffer broadcastDataBuffer;
    private DataBuffer unicastDataBuffer;

    public SortMergeResultPartition(String owningTaskName, int partitionIndex, ResultPartitionID partitionId, ResultPartitionType partitionType, int numSubpartitions, int numTargetKeyGroups, BatchShuffleReadBufferPool readBufferPool, Executor readIOExecutor, ResultPartitionManager partitionManager, String resultFileBasePath, @Nullable BufferCompressor bufferCompressor, SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
        super(owningTaskName, partitionIndex, partitionId, partitionType, numSubpartitions, numTargetKeyGroups, partitionManager, bufferCompressor, bufferPoolFactory);
        this.resultFileBasePath = (String)Preconditions.checkNotNull((Object)resultFileBasePath);
        this.readBufferPool = (BatchShuffleReadBufferPool)Preconditions.checkNotNull((Object)readBufferPool);
        this.networkBufferSize = readBufferPool.getBufferSize();
        this.subpartitionOrder = this.getRandomSubpartitionOrder(numSubpartitions);
        this.readScheduler = new SortMergeResultPartitionReadScheduler(readBufferPool, readIOExecutor, this.lock);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void setupInternal() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased()) {
                throw new IOException("Result partition has been released.");
            }
            try {
                this.fileWriter = new PartitionedFileWriter(this.numSubpartitions, 0x400000, this.resultFileBasePath, this.subpartitionOrder);
            }
            catch (Throwable throwable) {
                throw new IOException("Failed to create file writer.", throwable);
            }
        }
        this.requestGuaranteedBuffers();
        this.readBufferPool.initialize();
        LOG.info("Sort-merge partition {} initialized.", (Object)this.getPartitionId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void releaseInternal() {
        Object object = this.lock;
        synchronized (object) {
            if (this.resultFile == null && this.fileWriter != null) {
                this.fileWriter.releaseQuietly();
            }
        }
        this.readScheduler.release().thenRun(() -> {
            Object object = this.lock;
            synchronized (object) {
                if (this.resultFile != null) {
                    this.resultFile.deleteQuietly();
                    this.resultFile = null;
                }
            }
        });
    }

    @Override
    public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
        this.emit(record, targetSubpartition, Buffer.DataType.DATA_BUFFER, false);
    }

    @Override
    public void broadcastRecord(ByteBuffer record) throws IOException {
        this.broadcast(record, Buffer.DataType.DATA_BUFFER);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
        Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
        try {
            ByteBuffer serializedEvent = buffer.getNioBufferReadable();
            this.broadcast(serializedEvent, buffer.getDataType());
        }
        finally {
            buffer.recycleBuffer();
        }
    }

    @Override
    public void alignedBarrierTimeout(long checkpointId) throws IOException {
    }

    @Override
    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
    }

    private void broadcast(ByteBuffer record, Buffer.DataType dataType) throws IOException {
        this.emit(record, 0, dataType, true);
    }

    private void emit(ByteBuffer record, int targetSubpartition, Buffer.DataType dataType, boolean isBroadcast) throws IOException {
        DataBuffer dataBuffer;
        this.checkInProduceState();
        DataBuffer dataBuffer2 = dataBuffer = isBroadcast ? this.getBroadcastDataBuffer() : this.getUnicastDataBuffer();
        if (!dataBuffer.append(record, targetSubpartition, dataType)) {
            return;
        }
        if (!dataBuffer.hasRemaining()) {
            dataBuffer.release();
            this.writeLargeRecord(record, targetSubpartition, dataType, isBroadcast);
            return;
        }
        this.flushDataBuffer(dataBuffer, isBroadcast);
        dataBuffer.release();
        if (record.hasRemaining()) {
            this.emit(record, targetSubpartition, dataType, isBroadcast);
        }
    }

    private void releaseDataBuffer(DataBuffer dataBuffer) {
        if (dataBuffer != null) {
            dataBuffer.release();
        }
    }

    private DataBuffer getUnicastDataBuffer() throws IOException {
        this.flushBroadcastDataBuffer();
        if (this.unicastDataBuffer != null && !this.unicastDataBuffer.isFinished() && !this.unicastDataBuffer.isReleased()) {
            return this.unicastDataBuffer;
        }
        this.unicastDataBuffer = this.createNewDataBuffer();
        return this.unicastDataBuffer;
    }

    private DataBuffer getBroadcastDataBuffer() throws IOException {
        this.flushUnicastDataBuffer();
        if (this.broadcastDataBuffer != null && !this.broadcastDataBuffer.isFinished() && !this.broadcastDataBuffer.isReleased()) {
            return this.broadcastDataBuffer;
        }
        this.broadcastDataBuffer = this.createNewDataBuffer();
        return this.broadcastDataBuffer;
    }

    private DataBuffer createNewDataBuffer() throws IOException {
        this.requestNetworkBuffers();
        if (this.useHashBuffer) {
            return new HashBasedDataBuffer(this.freeSegments, this.bufferPool, this.numSubpartitions, this.networkBufferSize, this.numBuffersForSort, this.subpartitionOrder);
        }
        return new SortBasedDataBuffer(this.freeSegments, this.bufferPool, this.numSubpartitions, this.networkBufferSize, this.numBuffersForSort, this.subpartitionOrder);
    }

    private void requestGuaranteedBuffers() throws IOException {
        int numRequiredBuffer = this.bufferPool.getNumberOfRequiredMemorySegments();
        if (numRequiredBuffer < 2) {
            throw new IOException(String.format("Too few sort buffers, please increase %s.", NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS));
        }
        try {
            while (this.freeSegments.size() < numRequiredBuffer) {
                this.freeSegments.add((MemorySegment)Preconditions.checkNotNull((Object)this.bufferPool.requestMemorySegmentBlocking()));
            }
        }
        catch (InterruptedException exception) {
            this.releaseFreeBuffers();
            throw new IOException("Failed to allocate buffers for result partition.", exception);
        }
    }

    private void requestNetworkBuffers() throws IOException {
        MemorySegment segment;
        this.requestGuaranteedBuffers();
        while (this.freeSegments.size() < this.bufferPool.getMaxNumberOfMemorySegments() && (segment = this.bufferPool.requestMemorySegment()) != null) {
            this.freeSegments.add(segment);
        }
        this.useHashBuffer = false;
        int numWriteBuffers = 0;
        if (this.freeSegments.size() >= 2 * this.numSubpartitions) {
            this.useHashBuffer = true;
        } else {
            numWriteBuffers = this.networkBufferSize >= 0x800000 ? 1 : Math.min(512, 0x800000 / this.networkBufferSize);
        }
        numWriteBuffers = Math.min(this.freeSegments.size() / 2, numWriteBuffers);
        this.numBuffersForSort = this.freeSegments.size() - numWriteBuffers;
    }

    private void flushDataBuffer(DataBuffer dataBuffer, boolean isBroadcast) throws IOException {
        if (dataBuffer == null || dataBuffer.isReleased() || !dataBuffer.hasRemaining()) {
            return;
        }
        dataBuffer.finish();
        ArrayDeque<MemorySegment> segments = new ArrayDeque<MemorySegment>(this.freeSegments);
        int numBuffersToWrite = this.useHashBuffer ? 512 : Math.min(512, segments.size());
        ArrayList<BufferWithSubpartition> toWrite = new ArrayList<BufferWithSubpartition>(numBuffersToWrite);
        this.fileWriter.startNewRegion(isBroadcast);
        while (true) {
            BufferWithSubpartition bufferWithSubpartition;
            if (toWrite.size() >= numBuffersToWrite) {
                this.writeBuffers(toWrite);
                segments = new ArrayDeque<MemorySegment>(this.freeSegments);
            }
            if ((bufferWithSubpartition = dataBuffer.getNextBuffer((MemorySegment)segments.poll())) == null) break;
            this.updateStatistics(bufferWithSubpartition, isBroadcast);
            toWrite.add(this.compressBufferIfPossible(bufferWithSubpartition));
        }
        this.writeBuffers(toWrite);
        this.releaseFreeBuffers();
    }

    private void flushBroadcastDataBuffer() throws IOException {
        if (this.broadcastDataBuffer != null) {
            this.flushDataBuffer(this.broadcastDataBuffer, true);
            this.broadcastDataBuffer.release();
            this.broadcastDataBuffer = null;
        }
    }

    private void flushUnicastDataBuffer() throws IOException {
        if (this.unicastDataBuffer != null) {
            this.flushDataBuffer(this.unicastDataBuffer, false);
            this.unicastDataBuffer.release();
            this.unicastDataBuffer = null;
        }
    }

    private BufferWithSubpartition compressBufferIfPossible(BufferWithSubpartition bufferWithSubpartition) {
        Buffer buffer = bufferWithSubpartition.getBuffer();
        if (!this.canBeCompressed(buffer)) {
            return bufferWithSubpartition;
        }
        buffer = ((BufferCompressor)Preconditions.checkNotNull((Object)this.bufferCompressor)).compressToOriginalBuffer(buffer);
        return new BufferWithSubpartition(buffer, bufferWithSubpartition.getSubpartitionIndex());
    }

    private void updateStatistics(BufferWithSubpartition bufferWithSubpartition, boolean isBroadcast) {
        this.numBuffersOut.inc(isBroadcast ? (long)this.numSubpartitions : 1L);
        long readableBytes = bufferWithSubpartition.getBuffer().readableBytes();
        if (isBroadcast) {
            this.resultPartitionBytes.incAll(readableBytes);
        } else {
            this.resultPartitionBytes.inc(bufferWithSubpartition.getSubpartitionIndex(), readableBytes);
        }
        this.numBytesOut.inc(isBroadcast ? readableBytes * (long)this.numSubpartitions : readableBytes);
    }

    private void writeLargeRecord(ByteBuffer record, int targetSubpartition, Buffer.DataType dataType, boolean isBroadcast) throws IOException {
        this.fileWriter.startNewRegion(isBroadcast);
        ArrayList<BufferWithSubpartition> toWrite = new ArrayList<BufferWithSubpartition>();
        ArrayDeque<MemorySegment> segments = new ArrayDeque<MemorySegment>(this.freeSegments);
        while (record.hasRemaining()) {
            if (segments.isEmpty()) {
                this.fileWriter.writeBuffers(toWrite);
                toWrite.clear();
                segments = new ArrayDeque<MemorySegment>(this.freeSegments);
            }
            int toCopy = Math.min(record.remaining(), this.networkBufferSize);
            MemorySegment writeBuffer = (MemorySegment)Preconditions.checkNotNull((Object)((MemorySegment)segments.poll()));
            writeBuffer.put(0, record, toCopy);
            NetworkBuffer buffer = new NetworkBuffer(writeBuffer, buf -> {}, dataType, toCopy);
            BufferWithSubpartition bufferWithSubpartition = new BufferWithSubpartition(buffer, targetSubpartition);
            this.updateStatistics(bufferWithSubpartition, isBroadcast);
            toWrite.add(this.compressBufferIfPossible(bufferWithSubpartition));
        }
        this.fileWriter.writeBuffers(toWrite);
        this.releaseFreeBuffers();
    }

    private void writeBuffers(List<BufferWithSubpartition> buffers) throws IOException {
        this.fileWriter.writeBuffers(buffers);
        buffers.forEach(buffer -> buffer.getBuffer().recycleBuffer());
        buffers.clear();
    }

    @Override
    public void notifyEndOfData(StopMode mode) throws IOException {
        if (!this.hasNotifiedEndOfUserRecords) {
            this.broadcastEvent(new EndOfData(mode), false);
            this.hasNotifiedEndOfUserRecords = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void finish() throws IOException {
        this.broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
        Preconditions.checkState((this.unicastDataBuffer == null ? 1 : 0) != 0, (Object)"The unicast sort buffer should be either null or released.");
        this.flushBroadcastDataBuffer();
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.isReleased() ? 1 : 0) != 0, (Object)"Result partition is already released.");
            this.resultFile = this.fileWriter.finish();
            super.finish();
            LOG.info("New partitioned file produced: {}.", (Object)this.resultFile);
        }
    }

    private void releaseFreeBuffers() {
        if (this.bufferPool != null) {
            this.freeSegments.forEach(buffer -> this.bufferPool.recycle((MemorySegment)buffer));
            this.freeSegments.clear();
        }
    }

    @Override
    public void close() {
        this.releaseFreeBuffers();
        this.releaseDataBuffer(this.unicastDataBuffer);
        this.releaseDataBuffer(this.broadcastDataBuffer);
        super.close();
        IOUtils.closeQuietly((AutoCloseable)this.fileWriter);
    }

    @Override
    protected ResultSubpartitionView createSubpartitionView(int subpartitionIndex, BufferAvailabilityListener availabilityListener) throws IOException {
        throw new IllegalStateException("This method should not be called for a sort merge result partition.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ResultSubpartitionView createSubpartitionView(ResultSubpartitionIndexSet indexSet, BufferAvailabilityListener availabilityListener) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkElementIndex((int)indexSet.getEndIndex(), (int)this.numSubpartitions, (String)"Subpartition not found.");
            Preconditions.checkState((!this.isReleased() ? 1 : 0) != 0, (Object)"Partition released.");
            Preconditions.checkState((boolean)this.isFinished(), (Object)"Trying to read unfinished blocking partition.");
            if (!this.resultFile.isReadable()) {
                throw new PartitionNotFoundException(this.getPartitionId());
            }
            return this.readScheduler.createSubpartitionReader(availabilityListener, indexSet, this.resultFile, this.subpartitionOrder[0]);
        }
    }

    @Override
    public void flushAll() {
    }

    @Override
    public void flush(int subpartitionIndex) {
    }

    @Override
    public CompletableFuture<?> getAvailableFuture() {
        return AVAILABLE;
    }

    @Override
    public int getNumberOfQueuedBuffers() {
        return 0;
    }

    @Override
    public long getSizeOfQueuedBuffersUnsafe() {
        return 0L;
    }

    @Override
    public int getNumberOfQueuedBuffers(int targetSubpartition) {
        return 0;
    }

    private int[] getRandomSubpartitionOrder(int numSubpartitions) {
        int[] order = new int[numSubpartitions];
        Random random = new Random();
        int shift = random.nextInt(numSubpartitions);
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            order[(subpartition + shift) % numSubpartitions] = subpartition;
        }
        return order;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    PartitionedFile getResultFile() {
        Object object = this.lock;
        synchronized (object) {
            return this.resultFile;
        }
    }
}

