/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionReader;
import org.apache.flink.util.ExceptionUtils;

public class NettyConnectionReaderImpl
implements NettyConnectionReader {
    private final Supplier<InputChannel> inputChannelProvider;
    private final Map<Integer, Integer> lastRequiredSegmentIds = new HashMap<Integer, Integer>();

    public NettyConnectionReaderImpl(Supplier<InputChannel> inputChannelProvider) {
        this.inputChannelProvider = inputChannelProvider;
    }

    @Override
    public int peekNextBufferSubpartitionId() throws IOException {
        if (this.inputChannelProvider.get() instanceof RecoveredInputChannel) {
            return -1;
        }
        return this.inputChannelProvider.get().peekNextBufferSubpartitionId();
    }

    @Override
    public Optional<Buffer> readBuffer(int subpartitionId, int segmentId) {
        if ((long)segmentId > 0L && segmentId != this.lastRequiredSegmentIds.getOrDefault(subpartitionId, 0)) {
            this.lastRequiredSegmentIds.put(subpartitionId, segmentId);
            try {
                this.inputChannelProvider.get().notifyRequiredSegmentId(subpartitionId, segmentId);
            }
            catch (IOException e) {
                ExceptionUtils.rethrow(e, "Failed to notify required segment id");
            }
        }
        Optional<InputChannel.BufferAndAvailability> bufferAndAvailability = Optional.empty();
        try {
            if (this.inputChannelProvider.get().peekNextBufferSubpartitionId() != subpartitionId) {
                return Optional.empty();
            }
            bufferAndAvailability = this.inputChannelProvider.get().getNextBuffer();
        }
        catch (IOException | InterruptedException e) {
            ExceptionUtils.rethrow(e, "Failed to read buffer.");
        }
        return bufferAndAvailability.map(InputChannel.BufferAndAvailability::buffer);
    }
}

