/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.savepoint;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.Preconditions;

@Internal
@VisibleForTesting
public class SavepointV2Serializer
implements SavepointSerializer<SavepointV2> {
    private static final int MASTER_STATE_MAGIC_NUMBER = -915728746;
    private static final byte NULL_HANDLE = 0;
    private static final byte BYTE_STREAM_STATE_HANDLE = 1;
    private static final byte FILE_STREAM_STATE_HANDLE = 2;
    private static final byte KEY_GROUPS_HANDLE = 3;
    private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
    private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
    public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer();

    private SavepointV2Serializer() {
    }

    @Override
    public void serialize(SavepointV2 checkpointMetadata, DataOutputStream dos) throws IOException {
        dos.writeLong(checkpointMetadata.getCheckpointId());
        Collection<MasterState> masterStates = checkpointMetadata.getMasterStates();
        dos.writeInt(masterStates.size());
        for (MasterState ms : masterStates) {
            this.serializeMasterState(ms, dos);
        }
        Collection<OperatorState> operatorStates = checkpointMetadata.getOperatorStates();
        dos.writeInt(operatorStates.size());
        for (OperatorState operatorState : operatorStates) {
            dos.writeLong(operatorState.getOperatorID().getLowerPart());
            dos.writeLong(operatorState.getOperatorID().getUpperPart());
            int parallelism = operatorState.getParallelism();
            dos.writeInt(parallelism);
            dos.writeInt(operatorState.getMaxParallelism());
            dos.writeInt(1);
            Map<Integer, OperatorSubtaskState> subtaskStateMap = operatorState.getSubtaskStates();
            dos.writeInt(subtaskStateMap.size());
            for (Map.Entry<Integer, OperatorSubtaskState> entry : subtaskStateMap.entrySet()) {
                dos.writeInt(entry.getKey());
                SavepointV2Serializer.serializeSubtaskState(entry.getValue(), dos);
            }
        }
    }

    @Override
    public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
        List<MasterState> masterStates;
        long checkpointId = dis.readLong();
        if (checkpointId < 0L) {
            throw new IOException("invalid checkpoint ID: " + checkpointId);
        }
        int numMasterStates = dis.readInt();
        if (numMasterStates == 0) {
            masterStates = Collections.emptyList();
        } else if (numMasterStates > 0) {
            masterStates = new ArrayList(numMasterStates);
            for (int i = 0; i < numMasterStates; ++i) {
                masterStates.add(this.deserializeMasterState(dis));
            }
        } else {
            throw new IOException("invalid number of master states: " + numMasterStates);
        }
        int numTaskStates = dis.readInt();
        ArrayList<OperatorState> operatorStates = new ArrayList<OperatorState>(numTaskStates);
        for (int i = 0; i < numTaskStates; ++i) {
            OperatorID jobVertexId = new OperatorID(dis.readLong(), dis.readLong());
            int parallelism = dis.readInt();
            int maxParallelism = dis.readInt();
            int chainLength = dis.readInt();
            OperatorState taskState = new OperatorState(jobVertexId, parallelism, maxParallelism);
            operatorStates.add(taskState);
            int numSubTaskStates = dis.readInt();
            for (int j = 0; j < numSubTaskStates; ++j) {
                int subtaskIndex = dis.readInt();
                OperatorSubtaskState subtaskState = SavepointV2Serializer.deserializeSubtaskState(dis);
                taskState.putState(subtaskIndex, subtaskState);
            }
        }
        return new SavepointV2(checkpointId, operatorStates, masterStates);
    }

    private void serializeMasterState(MasterState state, DataOutputStream dos) throws IOException {
        dos.writeInt(-915728746);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream out = new DataOutputStream(baos);
        out.writeInt(state.version());
        out.writeUTF(state.name());
        byte[] bytes = state.bytes();
        out.writeInt(bytes.length);
        out.write(bytes, 0, bytes.length);
        out.close();
        byte[] data = baos.toByteArray();
        dos.writeInt(data.length);
        dos.write(data, 0, data.length);
    }

    private MasterState deserializeMasterState(DataInputStream dis) throws IOException {
        int magicNumber = dis.readInt();
        if (magicNumber != -915728746) {
            throw new IOException("incorrect magic number in master styte byte sequence");
        }
        int numBytes = dis.readInt();
        if (numBytes <= 0) {
            throw new IOException("found zero or negative length for master state bytes");
        }
        byte[] data = new byte[numBytes];
        dis.readFully(data);
        DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
        int version = in.readInt();
        String name = in.readUTF();
        byte[] bytes = new byte[in.readInt()];
        in.readFully(bytes);
        if (in.read() != -1) {
            throw new IOException("found trailing bytes in master state");
        }
        return new MasterState(name, bytes, version);
    }

    private static <T> T extractSingleton(Collection<T> collection) {
        if (collection == null || collection.isEmpty()) {
            return null;
        }
        if (collection.size() == 1) {
            return collection.iterator().next();
        }
        throw new IllegalStateException("Expected singleton collection, but found size: " + collection.size());
    }

    private static void serializeSubtaskState(OperatorSubtaskState subtaskState, DataOutputStream dos) throws IOException {
        OperatorStateHandle operatorStateFromStream;
        dos.writeLong(-1L);
        int len = 0;
        dos.writeInt(len);
        OperatorStateHandle operatorStateBackend = SavepointV2Serializer.extractSingleton(subtaskState.getManagedOperatorState());
        len = operatorStateBackend != null ? 1 : 0;
        dos.writeInt(len);
        if (len == 1) {
            SavepointV2Serializer.serializeOperatorStateHandle(operatorStateBackend, dos);
        }
        len = (operatorStateFromStream = SavepointV2Serializer.extractSingleton(subtaskState.getRawOperatorState())) != null ? 1 : 0;
        dos.writeInt(len);
        if (len == 1) {
            SavepointV2Serializer.serializeOperatorStateHandle(operatorStateFromStream, dos);
        }
        KeyedStateHandle keyedStateBackend = SavepointV2Serializer.extractSingleton(subtaskState.getManagedKeyedState());
        SavepointV2Serializer.serializeKeyedStateHandle(keyedStateBackend, dos);
        KeyedStateHandle keyedStateStream = SavepointV2Serializer.extractSingleton(subtaskState.getRawKeyedState());
        SavepointV2Serializer.serializeKeyedStateHandle(keyedStateStream, dos);
    }

    private static OperatorSubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
        long ignoredDuration = dis.readLong();
        int len = dis.readInt();
        if (SavepointSerializers.FAIL_WHEN_LEGACY_STATE_DETECTED) {
            Preconditions.checkState((len == 0 ? 1 : 0) != 0, (Object)"Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is no longer supported starting from Flink 1.4. Please rewrite your job to use 'CheckpointedFunction' instead!");
        } else {
            for (int i = 0; i < len; ++i) {
                SavepointV2Serializer.deserializeStreamStateHandle(dis);
            }
        }
        len = dis.readInt();
        OperatorStateHandle operatorStateBackend = len == 0 ? null : SavepointV2Serializer.deserializeOperatorStateHandle(dis);
        len = dis.readInt();
        OperatorStateHandle operatorStateStream = len == 0 ? null : SavepointV2Serializer.deserializeOperatorStateHandle(dis);
        KeyedStateHandle keyedStateBackend = SavepointV2Serializer.deserializeKeyedStateHandle(dis);
        KeyedStateHandle keyedStateStream = SavepointV2Serializer.deserializeKeyedStateHandle(dis);
        return new OperatorSubtaskState(operatorStateBackend, operatorStateStream, keyedStateBackend, keyedStateStream);
    }

    @VisibleForTesting
    public static void serializeKeyedStateHandle(KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle == null) {
            dos.writeByte(0);
        } else if (stateHandle instanceof KeyGroupsStateHandle) {
            KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle)stateHandle;
            dos.writeByte(3);
            dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
            dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
            for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
                dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
            }
            SavepointV2Serializer.serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
        } else if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) {
            IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = (IncrementalRemoteKeyedStateHandle)stateHandle;
            dos.writeByte(5);
            dos.writeLong(incrementalKeyedStateHandle.getCheckpointId());
            dos.writeUTF(String.valueOf(incrementalKeyedStateHandle.getBackendIdentifier()));
            dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getStartKeyGroup());
            dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
            SavepointV2Serializer.serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), dos);
            SavepointV2Serializer.serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), dos);
            SavepointV2Serializer.serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos);
        } else {
            throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
        }
    }

    private static void serializeStreamStateHandleMap(Map<StateHandleID, StreamStateHandle> map, DataOutputStream dos) throws IOException {
        dos.writeInt(map.size());
        for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) {
            dos.writeUTF(entry.getKey().toString());
            SavepointV2Serializer.serializeStreamStateHandle(entry.getValue(), dos);
        }
    }

    private static Map<StateHandleID, StreamStateHandle> deserializeStreamStateHandleMap(DataInputStream dis) throws IOException {
        int size = dis.readInt();
        HashMap<StateHandleID, StreamStateHandle> result = new HashMap<StateHandleID, StreamStateHandle>(size);
        for (int i = 0; i < size; ++i) {
            StateHandleID stateHandleID = new StateHandleID(dis.readUTF());
            StreamStateHandle stateHandle = SavepointV2Serializer.deserializeStreamStateHandle(dis);
            result.put(stateHandleID, stateHandle);
        }
        return result;
    }

    @VisibleForTesting
    public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
        byte type = dis.readByte();
        if (0 == type) {
            return null;
        }
        if (3 == type) {
            int startKeyGroup = dis.readInt();
            int numKeyGroups = dis.readInt();
            KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
            long[] offsets = new long[numKeyGroups];
            for (int i = 0; i < numKeyGroups; ++i) {
                offsets[i] = dis.readLong();
            }
            KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, offsets);
            StreamStateHandle stateHandle = SavepointV2Serializer.deserializeStreamStateHandle(dis);
            return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
        }
        if (5 == type) {
            UUID uuid;
            long checkpointId = dis.readLong();
            String backendId = dis.readUTF();
            int startKeyGroup = dis.readInt();
            int numKeyGroups = dis.readInt();
            KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
            StreamStateHandle metaDataStateHandle = SavepointV2Serializer.deserializeStreamStateHandle(dis);
            Map<StateHandleID, StreamStateHandle> sharedStates = SavepointV2Serializer.deserializeStreamStateHandleMap(dis);
            Map<StateHandleID, StreamStateHandle> privateStates = SavepointV2Serializer.deserializeStreamStateHandleMap(dis);
            try {
                uuid = UUID.fromString(backendId);
            }
            catch (Exception ex) {
                uuid = UUID.nameUUIDFromBytes(backendId.getBytes(StandardCharsets.UTF_8));
            }
            return new IncrementalRemoteKeyedStateHandle(uuid, keyGroupRange, checkpointId, sharedStates, privateStates, metaDataStateHandle);
        }
        throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type);
    }

    @VisibleForTesting
    public static void serializeOperatorStateHandle(OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle != null) {
            dos.writeByte(4);
            Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap = stateHandle.getStateNameToPartitionOffsets();
            dos.writeInt(partitionOffsetsMap.size());
            for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
                dos.writeUTF(entry.getKey());
                OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue();
                int mode = stateMetaInfo.getDistributionMode().ordinal();
                dos.writeByte(mode);
                long[] offsets = stateMetaInfo.getOffsets();
                dos.writeInt(offsets.length);
                for (long offset : offsets) {
                    dos.writeLong(offset);
                }
            }
            SavepointV2Serializer.serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
        } else {
            dos.writeByte(0);
        }
    }

    @VisibleForTesting
    public static OperatorStateHandle deserializeOperatorStateHandle(DataInputStream dis) throws IOException {
        byte type = dis.readByte();
        if (0 == type) {
            return null;
        }
        if (4 == type) {
            int mapSize = dis.readInt();
            HashMap<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<String, OperatorStateHandle.StateMetaInfo>(mapSize);
            for (int i = 0; i < mapSize; ++i) {
                String key = dis.readUTF();
                byte modeOrdinal = dis.readByte();
                OperatorStateHandle.Mode mode = OperatorStateHandle.Mode.values()[modeOrdinal];
                long[] offsets = new long[dis.readInt()];
                for (int j = 0; j < offsets.length; ++j) {
                    offsets[j] = dis.readLong();
                }
                OperatorStateHandle.StateMetaInfo metaInfo = new OperatorStateHandle.StateMetaInfo(offsets, mode);
                offsetsMap.put(key, metaInfo);
            }
            StreamStateHandle stateHandle = SavepointV2Serializer.deserializeStreamStateHandle(dis);
            return new OperatorStreamStateHandle(offsetsMap, stateHandle);
        }
        throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
    }

    @VisibleForTesting
    public static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle == null) {
            dos.writeByte(0);
        } else if (stateHandle instanceof FileStateHandle) {
            dos.writeByte(2);
            FileStateHandle fileStateHandle = (FileStateHandle)stateHandle;
            dos.writeLong(stateHandle.getStateSize());
            dos.writeUTF(fileStateHandle.getFilePath().toString());
        } else if (stateHandle instanceof ByteStreamStateHandle) {
            dos.writeByte(1);
            ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle)stateHandle;
            dos.writeUTF(byteStreamStateHandle.getHandleName());
            byte[] internalData = byteStreamStateHandle.getData();
            dos.writeInt(internalData.length);
            dos.write(byteStreamStateHandle.getData());
        } else {
            throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
        }
        dos.flush();
    }

    public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
        int type = dis.read();
        if (0 == type) {
            return null;
        }
        if (2 == type) {
            long size = dis.readLong();
            String pathString = dis.readUTF();
            return new FileStateHandle(new Path(pathString), size);
        }
        if (1 == type) {
            String handleName = dis.readUTF();
            int numBytes = dis.readInt();
            byte[] data = new byte[numBytes];
            dis.readFully(data);
            return new ByteStreamStateHandle(handleName, data);
        }
        throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
    }
}

