/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.resettable;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.resettable.ReusingBlockResettableIterator;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializer;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ReusingBlockResettableIteratorTest {
    private static final int MEMORY_CAPACITY = 393216;
    private static final int NUM_VALUES = 20000;
    private MemoryManager memman;
    private Iterator<Record> reader;
    private List<Record> objects;
    private final TypeSerializer<Record> serializer = RecordSerializer.get();

    ReusingBlockResettableIteratorTest() {
    }

    @BeforeEach
    void startup() {
        this.memman = MemoryManagerBuilder.newBuilder().setMemorySize(393216L).build();
        this.objects = new ArrayList<Record>(20000);
        for (int i = 0; i < 20000; ++i) {
            this.objects.add(new Record((Value)new IntValue(i)));
        }
        this.reader = this.objects.iterator();
    }

    @AfterEach
    void shutdown() {
        this.objects = null;
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.memman.verifyEmpty()).withFailMessage("A memory leak has occurred: Not all memory was properly returned to the memory manager.", new Object[0])).isTrue();
        this.memman.shutdown();
        this.memman = null;
    }

    @Test
    void testSerialBlockResettableIterator() throws Exception {
        DummyInvokable memOwner = new DummyInvokable();
        ReusingBlockResettableIterator iterator = new ReusingBlockResettableIterator(this.memman, this.reader, this.serializer, 1, (AbstractInvokable)memOwner);
        iterator.open();
        int lower = 0;
        int upper = 0;
        do {
            upper = lower = upper;
            while (iterator.hasNext()) {
                Record target = (Record)iterator.next();
                int val = ((IntValue)target.getField(0, IntValue.class)).getValue();
                Assertions.assertThat((int)val).isEqualTo(upper++);
            }
            for (int i = 0; i < 5; ++i) {
                iterator.reset();
                int count = 0;
                while (iterator.hasNext()) {
                    Record target = (Record)iterator.next();
                    int val = ((IntValue)target.getField(0, IntValue.class)).getValue();
                    Assertions.assertThat((int)val).isEqualTo(lower + count++);
                }
                Assertions.assertThat((int)count).isEqualTo(upper - lower);
            }
        } while (iterator.nextBlock());
        Assertions.assertThat((int)upper).isEqualTo(20000);
        iterator.close();
    }

    @Test
    void testDoubleBufferedBlockResettableIterator() throws Exception {
        DummyInvokable memOwner = new DummyInvokable();
        ReusingBlockResettableIterator iterator = new ReusingBlockResettableIterator(this.memman, this.reader, this.serializer, 2, (AbstractInvokable)memOwner);
        iterator.open();
        int lower = 0;
        int upper = 0;
        do {
            upper = lower = upper;
            while (iterator.hasNext()) {
                Record target = (Record)iterator.next();
                int val = ((IntValue)target.getField(0, IntValue.class)).getValue();
                Assertions.assertThat((int)val).isEqualTo(upper++);
            }
            for (int i = 0; i < 5; ++i) {
                iterator.reset();
                int count = 0;
                while (iterator.hasNext()) {
                    Record target = (Record)iterator.next();
                    int val = ((IntValue)target.getField(0, IntValue.class)).getValue();
                    Assertions.assertThat((int)val).isEqualTo(lower + count++);
                }
                Assertions.assertThat((int)count).isEqualTo(upper - lower);
            }
        } while (iterator.nextBlock());
        Assertions.assertThat((int)upper).isEqualTo(20000);
        iterator.close();
    }

    @Test
    void testTwelveFoldBufferedBlockResettableIterator() throws Exception {
        DummyInvokable memOwner = new DummyInvokable();
        ReusingBlockResettableIterator iterator = new ReusingBlockResettableIterator(this.memman, this.reader, this.serializer, 12, (AbstractInvokable)memOwner);
        iterator.open();
        int lower = 0;
        int upper = 0;
        do {
            upper = lower = upper;
            while (iterator.hasNext()) {
                Record target = (Record)iterator.next();
                int val = ((IntValue)target.getField(0, IntValue.class)).getValue();
                Assertions.assertThat((int)val).isEqualTo(upper++);
            }
            for (int i = 0; i < 5; ++i) {
                iterator.reset();
                int count = 0;
                while (iterator.hasNext()) {
                    Record target = (Record)iterator.next();
                    int val = ((IntValue)target.getField(0, IntValue.class)).getValue();
                    Assertions.assertThat((int)val).isEqualTo(lower + count++);
                }
                Assertions.assertThat((int)count).isEqualTo(upper - lower);
            }
        } while (iterator.nextBlock());
        Assertions.assertThat((int)upper).isEqualTo(20000);
        iterator.close();
    }
}

