/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.metadata;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.QuadFunction;

class ChannelStateHandleSerializer {
    ChannelStateHandleSerializer() {
    }

    public void serialize(ResultSubpartitionStateHandle handle, DataOutputStream dataOutputStream) throws IOException {
        ChannelStateHandleSerializer.serializeChannelStateHandle(handle, dataOutputStream, (info, out) -> {
            out.writeInt(info.getPartitionIdx());
            out.writeInt(info.getSubPartitionIdx());
        });
    }

    ResultSubpartitionStateHandle deserializeResultSubpartitionStateHandle(DataInputStream dis, MetadataV2V3SerializerBase.DeserializationContext context) throws IOException {
        return ChannelStateHandleSerializer.deserializeChannelStateHandle(is -> new ResultSubpartitionInfo(is.readInt(), is.readInt()), ResultSubpartitionStateHandle::new, dis, context);
    }

    public void serialize(InputChannelStateHandle handle, DataOutputStream dos) throws IOException {
        ChannelStateHandleSerializer.serializeChannelStateHandle(handle, dos, (info, dataOutputStream) -> {
            dos.writeInt(info.getGateIdx());
            dos.writeInt(info.getInputChannelIdx());
        });
    }

    InputChannelStateHandle deserializeInputChannelStateHandle(DataInputStream dis, MetadataV2V3SerializerBase.DeserializationContext context) throws IOException {
        return ChannelStateHandleSerializer.deserializeChannelStateHandle(is -> new InputChannelInfo(is.readInt(), is.readInt()), InputChannelStateHandle::new, dis, context);
    }

    private static <I> void serializeChannelStateHandle(AbstractChannelStateHandle<I> handle, DataOutputStream dos, BiConsumerWithException<I, DataOutputStream, IOException> infoWriter) throws IOException {
        dos.writeInt(handle.getSubtaskIndex());
        infoWriter.accept(handle.getInfo(), dos);
        dos.writeInt(handle.getOffsets().size());
        for (long offset : handle.getOffsets()) {
            dos.writeLong(offset);
        }
        dos.writeLong(handle.getStateSize());
        MetadataV2V3SerializerBase.serializeStreamStateHandle(handle.getDelegate(), dos);
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> Handle deserializeChannelStateHandle(FunctionWithException<DataInputStream, Info, IOException> infoReader, QuadFunction<Integer, Info, StreamStateHandle, AbstractChannelStateHandle.StateContentMetaInfo, Handle> handleBuilder, DataInputStream dis, MetadataV2V3SerializerBase.DeserializationContext context) throws IOException {
        int subtaskIndex = dis.readInt();
        Info info = infoReader.apply(dis);
        int offsetsSize = dis.readInt();
        ArrayList<Long> offsets = new ArrayList<Long>(offsetsSize);
        for (int i = 0; i < offsetsSize; ++i) {
            offsets.add(dis.readLong());
        }
        long size = dis.readLong();
        return (Handle)((AbstractChannelStateHandle)handleBuilder.apply(subtaskIndex, info, MetadataV2V3SerializerBase.deserializeStreamStateHandle(dis, context), new AbstractChannelStateHandle.StateContentMetaInfo(offsets, size)));
    }
}

