/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Random;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateTable;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot;
import org.apache.flink.runtime.state.heap.MockInternalKeyContext;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders;
import org.junit.Assert;
import org.junit.Test;

public class StateTableSnapshotCompatibilityTest {
    private final TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;

    @Test
    public void checkCompatibleSerializationFormats() throws IOException {
        Random r = new Random(42L);
        RegisteredKeyValueStateBackendMetaInfo metaInfo = new RegisteredKeyValueStateBackendMetaInfo(StateDescriptor.Type.UNKNOWN, "test", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new ArrayListSerializer((TypeSerializer)IntSerializer.INSTANCE));
        MockInternalKeyContext keyContext = new MockInternalKeyContext();
        CopyOnWriteStateTable cowStateTable = new CopyOnWriteStateTable(keyContext, metaInfo, this.keySerializer);
        for (int i = 0; i < 100; ++i) {
            ArrayList<Integer> list = new ArrayList<Integer>(5);
            int end = r.nextInt(5);
            for (int j = 0; j < end; ++j) {
                list.add(r.nextInt(100));
            }
            keyContext.setCurrentKey(r.nextInt(10));
            cowStateTable.put((Object)r.nextInt(2), list);
        }
        CopyOnWriteStateTableSnapshot snapshot = cowStateTable.stateSnapshot();
        NestedMapsStateTable nestedMapsStateTable = new NestedMapsStateTable(keyContext, metaInfo, this.keySerializer);
        this.restoreStateTableFromSnapshot((StateTable<Integer, Integer, ArrayList<Integer>>)nestedMapsStateTable, (StateSnapshot)snapshot, keyContext.getKeyGroupRange());
        snapshot.release();
        Assert.assertEquals((long)cowStateTable.size(), (long)nestedMapsStateTable.size());
        for (StateEntry entry : cowStateTable) {
            Assert.assertEquals((Object)entry.getState(), (Object)nestedMapsStateTable.get(entry.getKey(), entry.getNamespace()));
        }
        snapshot = nestedMapsStateTable.stateSnapshot();
        cowStateTable = new CopyOnWriteStateTable(keyContext, metaInfo, this.keySerializer);
        this.restoreStateTableFromSnapshot((StateTable<Integer, Integer, ArrayList<Integer>>)cowStateTable, (StateSnapshot)snapshot, keyContext.getKeyGroupRange());
        snapshot.release();
        Assert.assertEquals((long)nestedMapsStateTable.size(), (long)cowStateTable.size());
        for (StateEntry entry : cowStateTable) {
            Assert.assertEquals((Object)nestedMapsStateTable.get(entry.getKey(), entry.getNamespace()), (Object)entry.getState());
        }
    }

    private void restoreStateTableFromSnapshot(StateTable<Integer, Integer, ArrayList<Integer>> stateTable, StateSnapshot snapshot, KeyGroupRange keyGroupRange) throws IOException {
        ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(0x100000);
        DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)out);
        StateSnapshot.StateKeyGroupWriter keyGroupPartitionedSnapshot = snapshot.getKeyGroupWriter();
        for (Integer keyGroup : keyGroupRange) {
            keyGroupPartitionedSnapshot.writeStateInKeyGroup((DataOutputView)dov, keyGroup.intValue());
        }
        ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(out.getBuf());
        DataInputViewStreamWrapper div = new DataInputViewStreamWrapper((InputStream)in);
        StateSnapshotKeyGroupReader keyGroupReader = StateTableByKeyGroupReaders.readerForVersion(stateTable, (int)6);
        for (Integer keyGroup : keyGroupRange) {
            keyGroupReader.readMappingsInKeyGroup((DataInputView)div, keyGroup.intValue());
        }
    }
}

