/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.util.ArrayDeque;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.streaming.runtime.io.BufferStorage;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class CachedBufferStorage
implements BufferStorage {
    private static final Logger LOG = LoggerFactory.getLogger(CachedBufferStorage.class);
    private final long maxBufferedBytes;
    private final String taskName;
    private final ArrayDeque<BufferOrEventSequence> rolledOverBuffersQueue = new ArrayDeque();
    private BufferOrEventSequence rolledOverBuffers;
    private final int pageSize;
    private long rolledBytes;
    private long bytesBlocked;
    private ArrayDeque<BufferOrEvent> cachedBuffers;

    public CachedBufferStorage(int pageSize) {
        this(pageSize, -1L, "Unknown");
    }

    public CachedBufferStorage(int pageSize, long maxBufferedBytes, String taskName) {
        Preconditions.checkArgument((maxBufferedBytes == -1L || maxBufferedBytes > 0L ? 1 : 0) != 0);
        this.maxBufferedBytes = maxBufferedBytes;
        this.taskName = taskName;
        this.pageSize = pageSize;
        this.cachedBuffers = new ArrayDeque();
    }

    @Override
    public void add(BufferOrEvent boe) {
        this.bytesBlocked += (long)this.pageSize;
        this.cachedBuffers.add(boe);
    }

    @Override
    public void close() {
        BufferOrEvent boe;
        while ((boe = this.cachedBuffers.poll()) != null) {
            if (!boe.isBuffer()) continue;
            boe.getBuffer().recycleBuffer();
        }
        if (this.rolledOverBuffers != null) {
            this.rolledOverBuffers.cleanup();
        }
        for (BufferOrEventSequence seq : this.rolledOverBuffersQueue) {
            seq.cleanup();
        }
        this.rolledOverBuffersQueue.clear();
        this.rolledBytes = 0L;
    }

    @Override
    public long getPendingBytes() {
        return this.bytesBlocked;
    }

    @Override
    public boolean isFull() {
        return this.maxBufferedBytes > 0L && this.getRolledBytes() + this.getPendingBytes() > this.maxBufferedBytes;
    }

    @Override
    public void rollOver() {
        if (this.rolledOverBuffers == null) {
            this.rolledOverBuffers = this.rollOverCachedBuffers();
        } else {
            LOG.debug("{}: Checkpoint skipped via buffered data:Pushing back current alignment buffers and feeding back new alignment data first.", (Object)this.taskName);
            BufferOrEventSequence bufferedNow = this.rollOverCachedBuffers();
            if (bufferedNow != null) {
                this.rolledOverBuffersQueue.addFirst(this.rolledOverBuffers);
                this.rolledBytes += this.rolledOverBuffers.size();
                this.rolledOverBuffers = bufferedNow;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: Size of buffered data: {} bytes", (Object)this.taskName, (Object)(this.rolledOverBuffers == null ? 0L : this.rolledOverBuffers.size()));
        }
    }

    private BufferOrEventSequence rollOverCachedBuffers() {
        if (this.bytesBlocked == 0L) {
            return null;
        }
        BufferOrEventSequence currentSequence = new BufferOrEventSequence(this.cachedBuffers, this.bytesBlocked);
        this.cachedBuffers = new ArrayDeque();
        this.bytesBlocked = 0L;
        return currentSequence;
    }

    @Override
    public long getRolledBytes() {
        return this.rolledBytes;
    }

    @Override
    public boolean isEmpty() {
        return this.rolledOverBuffers == null;
    }

    @Override
    public Optional<BufferOrEvent> pollNext() {
        if (this.rolledOverBuffers == null) {
            return Optional.empty();
        }
        Optional<BufferOrEvent> next = Optional.ofNullable(this.rolledOverBuffers.getNext());
        if (!next.isPresent()) {
            this.completeBufferedSequence();
        }
        return next;
    }

    private void completeBufferedSequence() {
        LOG.debug("{}: Finished feeding back buffered data.", (Object)this.taskName);
        this.rolledOverBuffers.cleanup();
        this.rolledOverBuffers = this.rolledOverBuffersQueue.pollFirst();
        if (this.rolledOverBuffers != null) {
            this.rolledBytes -= this.rolledOverBuffers.size();
        }
    }

    @Override
    public long getMaxBufferedBytes() {
        return this.maxBufferedBytes;
    }

    private static class BufferOrEventSequence {
        private final ArrayDeque<BufferOrEvent> queuedBuffers;
        private final long size;

        BufferOrEventSequence(ArrayDeque<BufferOrEvent> buffers, long size) {
            this.queuedBuffers = buffers;
            this.size = size;
        }

        @Nullable
        public BufferOrEvent getNext() {
            return this.queuedBuffers.poll();
        }

        public void cleanup() {
            BufferOrEvent boe;
            while ((boe = this.queuedBuffers.poll()) != null) {
                if (!boe.isBuffer()) continue;
                boe.getBuffer().recycleBuffer();
            }
        }

        public long size() {
            return this.size;
        }
    }
}

