/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.buffer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NetworkBufferPool
implements BufferPoolFactory {
    private static final Logger LOG = LoggerFactory.getLogger(NetworkBufferPool.class);
    private final int totalNumberOfMemorySegments;
    private final int memorySegmentSize;
    private final Queue<MemorySegment> availableMemorySegments;
    private volatile boolean isDestroyed;
    private final Object factoryLock;
    private final Set<LocalBufferPool> managedBufferPools;
    public final Set<LocalBufferPool> allBufferPools;
    private int numTotalRequiredBuffers;

    public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize, MemoryType memoryType) {
        long sizeInLong;
        block8: {
            this.factoryLock = new Object();
            this.managedBufferPools = new HashSet<LocalBufferPool>();
            this.allBufferPools = new HashSet<LocalBufferPool>();
            Preconditions.checkNotNull((Object)memoryType);
            this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
            this.memorySegmentSize = segmentSize;
            sizeInLong = segmentSize;
            try {
                this.availableMemorySegments = new ArrayBlockingQueue<MemorySegment>(numberOfSegmentsToAllocate);
            }
            catch (OutOfMemoryError err) {
                throw new OutOfMemoryError("Could not allocate buffer queue of length " + numberOfSegmentsToAllocate + " - " + err.getMessage());
            }
            try {
                if (memoryType == MemoryType.HEAP) {
                    for (int i = 0; i < numberOfSegmentsToAllocate; ++i) {
                        byte[] memory = new byte[segmentSize];
                        this.availableMemorySegments.add(MemorySegmentFactory.wrapPooledHeapMemory((byte[])memory, null));
                    }
                    break block8;
                }
                if (memoryType == MemoryType.OFF_HEAP) {
                    for (int i = 0; i < numberOfSegmentsToAllocate; ++i) {
                        ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
                        this.availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory((ByteBuffer)memory, null));
                    }
                    break block8;
                }
                throw new IllegalArgumentException("Unknown memory type " + memoryType);
            }
            catch (OutOfMemoryError err) {
                int allocated = this.availableMemorySegments.size();
                this.availableMemorySegments.clear();
                long requiredMb = sizeInLong * (long)numberOfSegmentsToAllocate >> 20;
                long allocatedMb = sizeInLong * (long)allocated >> 20;
                long missingMb = requiredMb - allocatedMb;
                throw new OutOfMemoryError("Could not allocate enough memory segments for NetworkBufferPool (required (Mb): " + requiredMb + ", allocated (Mb): " + allocatedMb + ", missing (Mb): " + missingMb + "). Cause: " + err.getMessage());
            }
        }
        long allocatedMb = sizeInLong * (long)this.availableMemorySegments.size() >> 20;
        LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).", new Object[]{allocatedMb, this.availableMemorySegments.size(), segmentSize});
    }

    public MemorySegment requestMemorySegment() {
        return this.availableMemorySegments.poll();
    }

    public void recycle(MemorySegment segment) {
        this.availableMemorySegments.add(segment);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() {
        Object object = this.factoryLock;
        synchronized (object) {
            MemorySegment segment;
            this.isDestroyed = true;
            while ((segment = this.availableMemorySegments.poll()) != null) {
                segment.free();
            }
        }
    }

    public boolean isDestroyed() {
        return this.isDestroyed;
    }

    public int getMemorySegmentSize() {
        return this.memorySegmentSize;
    }

    public int getTotalNumberOfMemorySegments() {
        return this.totalNumberOfMemorySegments;
    }

    public int getNumberOfAvailableMemorySegments() {
        return this.availableMemorySegments.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfRegisteredBufferPools() {
        Object object = this.factoryLock;
        synchronized (object) {
            return this.allBufferPools.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int countBuffers() {
        int buffers = 0;
        Object object = this.factoryLock;
        synchronized (object) {
            for (BufferPool bufferPool : this.allBufferPools) {
                buffers += bufferPool.getNumBuffers();
            }
        }
        return buffers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public BufferPool createBufferPool(int numRequiredBuffers, boolean isFixedSize) throws IOException {
        Object object = this.factoryLock;
        synchronized (object) {
            if (this.isDestroyed) {
                throw new IllegalStateException("Network buffer pool has already been destroyed.");
            }
            if (this.numTotalRequiredBuffers + numRequiredBuffers > this.totalNumberOfMemorySegments) {
                throw new IOException(String.format("Insufficient number of network buffers: required %d, but only %d available. The total number of network buffers is currently set to %d. You can increase this number by setting the configuration key '%s'.", numRequiredBuffers, this.totalNumberOfMemorySegments - this.numTotalRequiredBuffers, this.totalNumberOfMemorySegments, "taskmanager.network.numberOfBuffers"));
            }
            this.numTotalRequiredBuffers += numRequiredBuffers;
            LocalBufferPool localBufferPool = new LocalBufferPool(this, numRequiredBuffers);
            if (!isFixedSize) {
                this.managedBufferPools.add(localBufferPool);
            }
            this.allBufferPools.add(localBufferPool);
            this.redistributeBuffers();
            return localBufferPool;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void destroyBufferPool(BufferPool bufferPool) {
        if (!(bufferPool instanceof LocalBufferPool)) {
            throw new IllegalArgumentException("bufferPool is no LocalBufferPool");
        }
        Object object = this.factoryLock;
        synchronized (object) {
            if (this.allBufferPools.remove(bufferPool)) {
                this.managedBufferPools.remove(bufferPool);
                this.numTotalRequiredBuffers -= bufferPool.getNumberOfRequiredMemorySegments();
                try {
                    this.redistributeBuffers();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroyAllBufferPools() {
        Object object = this.factoryLock;
        synchronized (object) {
            LocalBufferPool[] poolsCopy;
            for (LocalBufferPool pool : poolsCopy = this.allBufferPools.toArray(new LocalBufferPool[this.allBufferPools.size()])) {
                pool.lazyDestroy();
            }
            if (this.allBufferPools.size() > 0 || this.managedBufferPools.size() > 0 || this.numTotalRequiredBuffers > 0) {
                throw new IllegalStateException("NetworkBufferPool is not empty after destroying all LocalBufferPools");
            }
        }
    }

    private void redistributeBuffers() throws IOException {
        int numManagedBufferPools = this.managedBufferPools.size();
        if (numManagedBufferPools == 0) {
            return;
        }
        int numAvailableMemorySegment = this.totalNumberOfMemorySegments - this.numTotalRequiredBuffers;
        int numExcessBuffersPerPool = numAvailableMemorySegment / numManagedBufferPools;
        int numLeftoverBuffers = numAvailableMemorySegment % numManagedBufferPools;
        int bufferPoolIndex = 0;
        for (LocalBufferPool bufferPool : this.managedBufferPools) {
            int leftoverBuffers = bufferPoolIndex++ < numLeftoverBuffers ? 1 : 0;
            bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + numExcessBuffersPerPool + leftoverBuffers);
        }
    }
}

