/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.TestMemoryCheckpointOutputStream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class OperatorStateOutputCheckpointStreamTest {
    private static final int STREAM_CAPACITY = 128;

    OperatorStateOutputCheckpointStreamTest() {
    }

    private static OperatorStateCheckpointOutputStream createStream() throws IOException {
        TestMemoryCheckpointOutputStream checkStream = new TestMemoryCheckpointOutputStream(128);
        return new OperatorStateCheckpointOutputStream((CheckpointStateOutputStream)checkStream);
    }

    private OperatorStateHandle writeAllTestKeyGroups(OperatorStateCheckpointOutputStream stream, int numPartitions) throws Exception {
        DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)stream);
        for (int i = 0; i < numPartitions; ++i) {
            Assertions.assertThat((int)stream.getNumberOfPartitions()).isEqualTo(i);
            stream.startNewPartition();
            dov.writeInt(i);
        }
        return stream.closeAndGetHandle();
    }

    @Test
    void testCloseNotPropagated() throws Exception {
        OperatorStateCheckpointOutputStream stream = OperatorStateOutputCheckpointStreamTest.createStream();
        TestMemoryCheckpointOutputStream innerStream = (TestMemoryCheckpointOutputStream)stream.getDelegate();
        stream.close();
        Assertions.assertThat((boolean)innerStream.isClosed()).isFalse();
        innerStream.close();
    }

    @Test
    void testEmptyOperatorStream() throws Exception {
        OperatorStateCheckpointOutputStream stream = OperatorStateOutputCheckpointStreamTest.createStream();
        TestMemoryCheckpointOutputStream innerStream = (TestMemoryCheckpointOutputStream)stream.getDelegate();
        OperatorStateHandle emptyHandle = stream.closeAndGetHandle();
        Assertions.assertThat((boolean)innerStream.isClosed()).isTrue();
        Assertions.assertThat((int)stream.getNumberOfPartitions()).isZero();
        Assertions.assertThat((Object)emptyHandle).isNull();
    }

    @Test
    void testWriteReadRoundtrip() throws Exception {
        int numPartitions = 3;
        OperatorStateCheckpointOutputStream stream = OperatorStateOutputCheckpointStreamTest.createStream();
        OperatorStateHandle fullHandle = this.writeAllTestKeyGroups(stream, numPartitions);
        Assertions.assertThat((Object)fullHandle).isNotNull();
        Map stateNameToPartitionOffsets = fullHandle.getStateNameToPartitionOffsets();
        for (Map.Entry entry : stateNameToPartitionOffsets.entrySet()) {
            Assertions.assertThat((Comparable)((OperatorStateHandle.StateMetaInfo)entry.getValue()).getDistributionMode()).isEqualTo((Object)OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
        }
        OperatorStateOutputCheckpointStreamTest.verifyRead(fullHandle, numPartitions);
    }

    private static void verifyRead(OperatorStateHandle fullHandle, int numPartitions) throws IOException {
        int count = 0;
        try (FSDataInputStream in = fullHandle.openInputStream();){
            OperatorStateHandle.StateMetaInfo metaInfo = (OperatorStateHandle.StateMetaInfo)fullHandle.getStateNameToPartitionOffsets().get("_default_");
            long[] offsets = metaInfo.getOffsets();
            Assertions.assertThat((long[])offsets).isNotNull();
            DataInputViewStreamWrapper div = new DataInputViewStreamWrapper((InputStream)in);
            for (int i = 0; i < numPartitions; ++i) {
                in.seek(offsets[i]);
                Assertions.assertThat((int)div.readInt()).isEqualTo(i);
                ++count;
            }
        }
        Assertions.assertThat((int)count).isEqualTo(numPartitions);
    }
}

