/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateByteBuffer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.junit.Assert;
import org.junit.Test;

public class ChannelStateSerializerImplTest {
    private final Random random = new Random();

    @Test
    public void testReadWrite() throws IOException {
        byte[] data = ChannelStateSerializerImplTest.generateData(123);
        ChannelStateSerializerImpl serializer = new ChannelStateSerializerImpl();
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length);){
            this.write(data, serializer, baos);
            this.readAndCheck(data, serializer, new ByteArrayInputStream(baos.toByteArray()));
        }
    }

    @Test
    public void testReadWriteWithMultipleBuffers() throws IOException {
        int bufSize = 10;
        int[] numBuffersToWriteAtOnce = new int[]{0, 1, 2, 3};
        byte[] data = ChannelStateSerializerImplTest.generateData(bufSize);
        ChannelStateSerializerImpl s = new ChannelStateSerializerImpl();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream out = new DataOutputStream(baos);
        s.writeHeader(out);
        for (int count : numBuffersToWriteAtOnce) {
            Object[] buffers = new Buffer[count];
            Arrays.fill(buffers, this.getBuffer(data));
            s.writeData(out, (Buffer[])buffers);
        }
        out.close();
        ChannelStateSerializerImpl d = new ChannelStateSerializerImpl();
        ByteArrayInputStream is = new ByteArrayInputStream(baos.toByteArray());
        d.readHeader((InputStream)is);
        for (int count : numBuffersToWriteAtOnce) {
            int expected = bufSize * count;
            Assert.assertEquals((long)expected, (long)d.readLength((InputStream)is));
            byte[] readBuf = new byte[expected];
            Assert.assertEquals((long)expected, (long)d.readData((InputStream)is, ChannelStateByteBuffer.wrap((byte[])readBuf), Integer.MAX_VALUE));
            for (int i = 0; i < count; ++i) {
                Assert.assertArrayEquals((byte[])data, (byte[])Arrays.copyOfRange(readBuf, i * bufSize, (i + 1) * bufSize));
            }
        }
    }

    @Test
    public void testReadToBufferBuilder() throws IOException {
        byte[] data = ChannelStateSerializerImplTest.generateData(100);
        BufferBuilder bufferBuilder = new BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment((int)data.length, null), FreeingBufferRecycler.INSTANCE);
        BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
        new ChannelStateSerializerImpl().readData((InputStream)new ByteArrayInputStream(data), ChannelStateByteBuffer.wrap((BufferBuilder)bufferBuilder), Integer.MAX_VALUE);
        Assert.assertFalse((boolean)bufferBuilder.isFinished());
        bufferBuilder.finish();
        Buffer buffer = bufferConsumer.build();
        Assert.assertEquals((long)data.length, (long)buffer.readableBytes());
        byte[] actual = new byte[buffer.readableBytes()];
        buffer.asByteBuf().readBytes(actual);
        Assert.assertArrayEquals((byte[])data, (byte[])actual);
    }

    private NetworkBuffer getBuffer(byte[] data) {
        NetworkBuffer buffer = new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)data.length, null), FreeingBufferRecycler.INSTANCE);
        buffer.writeBytes(data);
        return buffer;
    }

    private byte[] readBytes(NetworkBuffer buffer) {
        byte[] tmp = new byte[buffer.readableBytes()];
        buffer.readBytes(tmp);
        return tmp;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void write(byte[] data, ChannelStateSerializerImpl serializer, OutputStream baos) throws IOException {
        DataOutputStream out = new DataOutputStream(baos);
        serializer.writeHeader(out);
        NetworkBuffer buffer = new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)data.length), FreeingBufferRecycler.INSTANCE);
        try {
            buffer.writeBytes(data);
            serializer.writeData(out, new Buffer[]{buffer});
            out.flush();
        }
        finally {
            buffer.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void readAndCheck(byte[] data, ChannelStateSerializerImpl serializer, ByteArrayInputStream is) throws IOException {
        serializer.readHeader((InputStream)is);
        int size = serializer.readLength((InputStream)is);
        Assert.assertEquals((long)data.length, (long)size);
        NetworkBuffer buffer = new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)data.length), FreeingBufferRecycler.INSTANCE);
        try {
            int read = serializer.readData((InputStream)is, ChannelStateByteBuffer.wrap((Buffer)buffer), size);
            Assert.assertEquals((long)size, (long)read);
            Assert.assertArrayEquals((byte[])data, (byte[])this.readBytes(buffer));
        }
        finally {
            buffer.release();
        }
    }

    static byte[] generateData(int len) {
        byte[] bytes = new byte[len];
        new Random().nextBytes(bytes);
        return bytes;
    }
}

