/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.Closeable;
import java.io.IOException;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializer;
import org.apache.flink.runtime.checkpoint.channel.RecoveredChannelStateHandler;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;

class ChannelStateChunkReader {
    private final ChannelStateSerializer serializer;

    ChannelStateChunkReader(ChannelStateSerializer serializer) {
        this.serializer = serializer;
    }

    <Info, Context> void readChunk(FSDataInputStream source, long sourceOffset, RecoveredChannelStateHandler<Info, Context> stateHandler, Info channelInfo, int oldSubtaskIndex) throws IOException, InterruptedException {
        if (source.getPos() != sourceOffset) {
            source.seek(sourceOffset);
        }
        int length = this.serializer.readLength(source);
        while (length > 0) {
            RecoveredChannelStateHandler.BufferWithContext<Context> bufferWithContext = stateHandler.getBuffer(channelInfo);
            try (Closeable ignored = NetworkActionsLogger.measureIO("ChannelStateChunkReader#readChunk", bufferWithContext.buffer);){
                while (length > 0 && bufferWithContext.buffer.isWritable()) {
                    length -= this.serializer.readData(source, bufferWithContext.buffer, length);
                }
            }
            catch (Exception e) {
                bufferWithContext.close();
                throw e;
            }
            stateHandler.recover(channelInfo, oldSubtaskIndex, bufferWithContext);
        }
    }
}

