/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.util.Collection;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.hybrid.HsBufferContext;
import org.apache.flink.runtime.io.network.partition.hybrid.HsConsumerId;
import org.apache.flink.runtime.io.network.partition.hybrid.HsDataView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

public class HsSubpartitionConsumerMemoryDataManager
implements HsDataView {
    @GuardedBy(value="consumerLock")
    private final Deque<HsBufferContext> unConsumedBuffers = new LinkedList<HsBufferContext>();
    private final Lock consumerLock;
    private final Lock resultPartitionLock;
    private final HsConsumerId consumerId;
    private final int subpartitionId;
    private final HsMemoryDataManagerOperation memoryDataManagerOperation;
    @GuardedBy(value="consumerLock")
    private int backlog = 0;

    public HsSubpartitionConsumerMemoryDataManager(Lock resultPartitionLock, Lock consumerLock, int subpartitionId, HsConsumerId consumerId, HsMemoryDataManagerOperation memoryDataManagerOperation) {
        this.resultPartitionLock = resultPartitionLock;
        this.consumerLock = consumerLock;
        this.subpartitionId = subpartitionId;
        this.consumerId = consumerId;
        this.memoryDataManagerOperation = memoryDataManagerOperation;
    }

    @GuardedBy(value="consumerLock")
    public void addInitialBuffers(Deque<HsBufferContext> buffers) {
        for (HsBufferContext bufferContext : buffers) {
            this.tryIncreaseBacklog(bufferContext.getBuffer());
        }
        this.unConsumedBuffers.addAll(buffers);
    }

    @GuardedBy(value="consumerLock")
    public boolean addBuffer(HsBufferContext bufferContext) {
        this.tryIncreaseBacklog(bufferContext.getBuffer());
        this.unConsumedBuffers.add(bufferContext);
        this.trimHeadingReleasedBuffers();
        return this.unConsumedBuffers.size() <= 1;
    }

    @Override
    public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int toConsumeIndex, Collection<Buffer> buffersToRecycle) {
        Optional bufferAndNextDataType = this.callWithLock(() -> {
            if (!this.checkFirstUnConsumedBufferIndex(toConsumeIndex)) {
                return Optional.empty();
            }
            HsBufferContext bufferContext = Preconditions.checkNotNull(this.unConsumedBuffers.pollFirst());
            this.tryDecreaseBacklog(bufferContext.getBuffer());
            bufferContext.consumed(this.consumerId);
            Buffer.DataType nextDataType = this.peekNextToConsumeDataTypeInternal(toConsumeIndex + 1);
            return Optional.of(Tuple2.of(bufferContext, nextDataType));
        });
        bufferAndNextDataType.ifPresent(tuple -> this.memoryDataManagerOperation.onBufferConsumed(((HsBufferContext)tuple.f0).getBufferIndexAndChannel()));
        return bufferAndNextDataType.map(tuple -> new ResultSubpartition.BufferAndBacklog(((HsBufferContext)tuple.f0).getBuffer().readOnlySlice(), this.getBacklog(), (Buffer.DataType)((Object)((Object)tuple.f1)), toConsumeIndex));
    }

    @Override
    public Buffer.DataType peekNextToConsumeDataType(int nextToConsumeIndex, Collection<Buffer> buffersToRecycle) {
        return this.callWithLock(() -> this.peekNextToConsumeDataTypeInternal(nextToConsumeIndex));
    }

    @GuardedBy(value="consumerLock")
    private Buffer.DataType peekNextToConsumeDataTypeInternal(int nextToConsumeIndex) {
        return this.checkFirstUnConsumedBufferIndex(nextToConsumeIndex) ? Preconditions.checkNotNull(this.unConsumedBuffers.peekFirst()).getBuffer().getDataType() : Buffer.DataType.NONE;
    }

    @GuardedBy(value="consumerLock")
    private boolean checkFirstUnConsumedBufferIndex(int expectedBufferIndex) {
        this.trimHeadingReleasedBuffers();
        return !this.unConsumedBuffers.isEmpty() && this.unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex() == expectedBufferIndex;
    }

    @Override
    public int getBacklog() {
        return this.backlog;
    }

    @Override
    public void releaseDataView() {
        this.memoryDataManagerOperation.onConsumerReleased(this.subpartitionId, this.consumerId);
    }

    @GuardedBy(value="consumerLock")
    private void trimHeadingReleasedBuffers() {
        while (!this.unConsumedBuffers.isEmpty() && this.unConsumedBuffers.peekFirst().isReleased()) {
            this.tryDecreaseBacklog(this.unConsumedBuffers.removeFirst().getBuffer());
        }
    }

    @GuardedBy(value="consumerLock")
    private void tryIncreaseBacklog(Buffer buffer) {
        if (buffer.isBuffer()) {
            ++this.backlog;
        }
    }

    @GuardedBy(value="consumerLock")
    private void tryDecreaseBacklog(Buffer buffer) {
        if (buffer.isBuffer()) {
            --this.backlog;
        }
    }

    private <R, E extends Exception> R callWithLock(SupplierWithException<R, E> callable) throws E {
        try {
            this.resultPartitionLock.lock();
            this.consumerLock.lock();
            R r = callable.get();
            return r;
        }
        finally {
            this.consumerLock.unlock();
            this.resultPartitionLock.unlock();
        }
    }
}

