package org.apache.flink.runtime.io.disk;

import java.io.EOFException;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/disk/ChannelViewsTest.class */
class ChannelViewsTest {
    private static final long SEED = 649180756312423613L;
    private static final int KEY_MAX = Integer.MAX_VALUE;
    private static final int VALUE_SHORT_LENGTH = 114;
    private static final int VALUE_LONG_LENGTH = 114688;
    private static final int NUM_PAIRS_SHORT = 1000000;
    private static final int NUM_PAIRS_LONG = 3000;
    private static final int MEMORY_SIZE = 1048576;
    private static final int MEMORY_PAGE_SIZE = 65536;
    private static final int NUM_MEMORY_SEGMENTS = 3;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;

    ChannelViewsTest() {
    }

    @BeforeEach
    void beforeTest() {
        this.memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(1048576L).setPageSize(MEMORY_PAGE_SIZE).build();
        this.ioManager = new IOManagerAsync();
    }

    @AfterEach
    void afterTest() throws Exception {
        this.ioManager.close();
        if (this.memoryManager != null) {
            Assertions.assertThat(this.memoryManager.verifyEmpty()).withFailMessage("Memory leak: not all segments have been returned to the memory manager.", new Object[0]).isTrue();
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    void testWriteReadSmallRecords() throws Exception {
        TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID createChannel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> intStringTupleSerializer = TestData.getIntStringTupleSerializer();
        ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(this.ioManager.createBlockChannelWriter(createChannel), this.memoryManager.allocatePages(this.parentTask, 3), MEMORY_PAGE_SIZE);
        Tuple2<Integer, String> tuple2 = new Tuple2<>();
        for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.serialize(tuple2, channelWriterOutputView);
        }
        this.memoryManager.release(channelWriterOutputView.close());
        List allocatePages = this.memoryManager.allocatePages(this.parentTask, 3);
        BlockChannelReader createBlockChannelReader = this.ioManager.createBlockChannelReader(createChannel);
        ChannelReaderInputView channelReaderInputView = new ChannelReaderInputView(createBlockChannelReader, allocatePages, channelWriterOutputView.getBlockCount(), true);
        tupleGenerator.reset();
        Tuple2 tuple22 = new Tuple2();
        for (int i2 = 0; i2 < NUM_PAIRS_SHORT; i2++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, channelReaderInputView);
            assertReadRecordMatchRegenerated(tuple22, tuple2);
        }
        this.memoryManager.release(channelReaderInputView.close());
        createBlockChannelReader.deleteChannel();
    }

    @Test
    void testWriteAndReadLongRecords() throws Exception {
        TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LONG_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID createChannel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> intStringTupleSerializer = TestData.getIntStringTupleSerializer();
        ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(this.ioManager.createBlockChannelWriter(createChannel), this.memoryManager.allocatePages(this.parentTask, 3), MEMORY_PAGE_SIZE);
        Tuple2<Integer, String> tuple2 = new Tuple2<>();
        for (int i = 0; i < NUM_PAIRS_LONG; i++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.serialize(tuple2, channelWriterOutputView);
        }
        this.memoryManager.release(channelWriterOutputView.close());
        List allocatePages = this.memoryManager.allocatePages(this.parentTask, 3);
        BlockChannelReader createBlockChannelReader = this.ioManager.createBlockChannelReader(createChannel);
        ChannelReaderInputView channelReaderInputView = new ChannelReaderInputView(createBlockChannelReader, allocatePages, channelWriterOutputView.getBlockCount(), true);
        tupleGenerator.reset();
        Tuple2 tuple22 = new Tuple2();
        for (int i2 = 0; i2 < NUM_PAIRS_LONG; i2++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, channelReaderInputView);
            assertReadRecordMatchRegenerated(tuple22, tuple2);
        }
        this.memoryManager.release(channelReaderInputView.close());
        createBlockChannelReader.deleteChannel();
    }

    @Test
    void testReadTooMany() throws Exception {
        TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID createChannel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> intStringTupleSerializer = TestData.getIntStringTupleSerializer();
        ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(this.ioManager.createBlockChannelWriter(createChannel), this.memoryManager.allocatePages(this.parentTask, 3), MEMORY_PAGE_SIZE);
        Tuple2<Integer, String> tuple2 = new Tuple2<>();
        for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.serialize(tuple2, channelWriterOutputView);
        }
        this.memoryManager.release(channelWriterOutputView.close());
        List allocatePages = this.memoryManager.allocatePages(this.parentTask, 3);
        BlockChannelReader createBlockChannelReader = this.ioManager.createBlockChannelReader(createChannel);
        ChannelReaderInputView channelReaderInputView = new ChannelReaderInputView(createBlockChannelReader, allocatePages, channelWriterOutputView.getBlockCount(), true);
        tupleGenerator.reset();
        Tuple2 tuple22 = new Tuple2();
        for (int i2 = 0; i2 < NUM_PAIRS_SHORT; i2++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, channelReaderInputView);
            assertReadRecordMatchRegenerated(tuple22, tuple2);
        }
        tupleGenerator.next(tuple2);
        Assertions.assertThatThrownBy(() -> {
        }).withFailMessage("Expected an EOFException which did not occur.", new Object[0]).isInstanceOf(EOFException.class);
        this.memoryManager.release(channelReaderInputView.close());
        createBlockChannelReader.deleteChannel();
    }

    @Test
    void testReadWithoutKnownBlockCount() throws Exception {
        TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID createChannel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> intStringTupleSerializer = TestData.getIntStringTupleSerializer();
        ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(this.ioManager.createBlockChannelWriter(createChannel), this.memoryManager.allocatePages(this.parentTask, 3), MEMORY_PAGE_SIZE);
        Tuple2<Integer, String> tuple2 = new Tuple2<>();
        for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.serialize(tuple2, channelWriterOutputView);
        }
        this.memoryManager.release(channelWriterOutputView.close());
        List allocatePages = this.memoryManager.allocatePages(this.parentTask, 3);
        BlockChannelReader createBlockChannelReader = this.ioManager.createBlockChannelReader(createChannel);
        ChannelReaderInputView channelReaderInputView = new ChannelReaderInputView(createBlockChannelReader, allocatePages, true);
        tupleGenerator.reset();
        Tuple2 tuple22 = new Tuple2();
        for (int i2 = 0; i2 < NUM_PAIRS_SHORT; i2++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, channelReaderInputView);
            assertReadRecordMatchRegenerated(tuple22, tuple2);
        }
        this.memoryManager.release(channelReaderInputView.close());
        createBlockChannelReader.deleteChannel();
    }

    @Test
    void testWriteReadOneBufferOnly() throws Exception {
        TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID createChannel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> intStringTupleSerializer = TestData.getIntStringTupleSerializer();
        ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(this.ioManager.createBlockChannelWriter(createChannel), this.memoryManager.allocatePages(this.parentTask, 1), MEMORY_PAGE_SIZE);
        Tuple2<Integer, String> tuple2 = new Tuple2<>();
        for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.serialize(tuple2, channelWriterOutputView);
        }
        this.memoryManager.release(channelWriterOutputView.close());
        List allocatePages = this.memoryManager.allocatePages(this.parentTask, 1);
        BlockChannelReader createBlockChannelReader = this.ioManager.createBlockChannelReader(createChannel);
        ChannelReaderInputView channelReaderInputView = new ChannelReaderInputView(createBlockChannelReader, allocatePages, channelWriterOutputView.getBlockCount(), true);
        tupleGenerator.reset();
        Tuple2 tuple22 = new Tuple2();
        for (int i2 = 0; i2 < NUM_PAIRS_SHORT; i2++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, channelReaderInputView);
            assertReadRecordMatchRegenerated(tuple22, tuple2);
        }
        this.memoryManager.release(channelReaderInputView.close());
        createBlockChannelReader.deleteChannel();
    }

    @Test
    void testWriteReadNotAll() throws Exception {
        TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID createChannel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> intStringTupleSerializer = TestData.getIntStringTupleSerializer();
        ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(this.ioManager.createBlockChannelWriter(createChannel), this.memoryManager.allocatePages(this.parentTask, 3), MEMORY_PAGE_SIZE);
        Tuple2<Integer, String> tuple2 = new Tuple2<>();
        for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.serialize(tuple2, channelWriterOutputView);
        }
        this.memoryManager.release(channelWriterOutputView.close());
        List allocatePages = this.memoryManager.allocatePages(this.parentTask, 3);
        BlockChannelReader createBlockChannelReader = this.ioManager.createBlockChannelReader(createChannel);
        ChannelReaderInputView channelReaderInputView = new ChannelReaderInputView(createBlockChannelReader, allocatePages, channelWriterOutputView.getBlockCount(), true);
        tupleGenerator.reset();
        Tuple2 tuple22 = new Tuple2();
        for (int i2 = 0; i2 < 500000; i2++) {
            tupleGenerator.next(tuple2);
            intStringTupleSerializer.deserialize(tuple22, channelReaderInputView);
            assertReadRecordMatchRegenerated(tuple22, tuple2);
        }
        this.memoryManager.release(channelReaderInputView.close());
        createBlockChannelReader.deleteChannel();
    }

    private static void assertReadRecordMatchRegenerated(Tuple2<Integer, String> tuple2, Tuple2<Integer, String> tuple22) {
        int intValue = ((Integer) tuple22.f0).intValue();
        String str = (String) tuple22.f1;
        int intValue2 = ((Integer) tuple2.f0).intValue();
        String str2 = (String) tuple2.f1;
        Assertions.assertThat(intValue2).withFailMessage("The re-generated and the read record do not match.", new Object[0]).isEqualTo(intValue);
        Assertions.assertThat(str2).withFailMessage("The re-generated and the read record do not match.", new Object[0]).isEqualTo(str);
    }
}
