/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RestoreOperation;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapMetaInfoRestoreOperation;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.heap.StateTableFactory;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeapRestoreOperation<K>
implements RestoreOperation<Void> {
    private static final Logger LOG = LoggerFactory.getLogger(HeapRestoreOperation.class);
    private final Collection<KeyedStateHandle> restoreStateHandles;
    private final StateSerializerProvider<K> keySerializerProvider;
    private final ClassLoader userCodeClassLoader;
    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
    private final CloseableRegistry cancelStreamRegistry;
    @Nonnull
    private final KeyGroupRange keyGroupRange;
    private final HeapMetaInfoRestoreOperation<K> heapMetaInfoRestoreOperation;

    HeapRestoreOperation(@Nonnull Collection<KeyedStateHandle> restoreStateHandles, StateSerializerProvider<K> keySerializerProvider, ClassLoader userCodeClassLoader, Map<String, StateTable<K, ?, ?>> registeredKVStates, Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, CloseableRegistry cancelStreamRegistry, HeapPriorityQueueSetFactory priorityQueueSetFactory, @Nonnull KeyGroupRange keyGroupRange, int numberOfKeyGroups, StateTableFactory<K> stateTableFactory, InternalKeyContext<K> keyContext) {
        this.restoreStateHandles = restoreStateHandles;
        this.keySerializerProvider = keySerializerProvider;
        this.userCodeClassLoader = userCodeClassLoader;
        this.registeredKVStates = registeredKVStates;
        this.registeredPQStates = registeredPQStates;
        this.cancelStreamRegistry = cancelStreamRegistry;
        this.keyGroupRange = keyGroupRange;
        this.heapMetaInfoRestoreOperation = new HeapMetaInfoRestoreOperation<K>(keySerializerProvider, priorityQueueSetFactory, keyGroupRange, numberOfKeyGroups, stateTableFactory, keyContext);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Void restore() throws Exception {
        this.registeredKVStates.clear();
        this.registeredPQStates.clear();
        boolean keySerializerRestored = false;
        for (KeyedStateHandle keyedStateHandle : this.restoreStateHandles) {
            if (keyedStateHandle == null) continue;
            if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                throw StateUtil.unexpectedStateHandleException(KeyGroupsStateHandle.class, keyedStateHandle.getClass());
            }
            LOG.info("Starting to restore from state handle: {}.", (Object)keyedStateHandle);
            KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
            FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
            this.cancelStreamRegistry.registerCloseable(fsDataInputStream);
            try {
                DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
                serializationProxy.read(inView);
                if (!keySerializerRestored) {
                    TypeSerializer<K> currentSerializer = this.keySerializerProvider.currentSchemaSerializer();
                    TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat = this.keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(serializationProxy.getKeySerializerSnapshot());
                    if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
                        throw new StateMigrationException("The new key serializer (" + currentSerializer + ") must be compatible with the previous key serializer (" + this.keySerializerProvider.previousSchemaSerializer() + ").");
                    }
                    keySerializerRestored = true;
                }
                List<StateMetaInfoSnapshot> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots();
                Map<Integer, StateMetaInfoSnapshot> kvStatesById = this.heapMetaInfoRestoreOperation.createOrCheckStateForMetaInfo(restoredMetaInfos, this.registeredKVStates, this.registeredPQStates);
                this.readStateHandleStateData(fsDataInputStream, inView, keyGroupsStateHandle.getGroupRangeOffsets(), kvStatesById, restoredMetaInfos.size(), serializationProxy.getReadVersion(), serializationProxy.isUsingKeyGroupCompression());
                LOG.info("Finished restoring from state handle: {}.", (Object)keyedStateHandle);
            }
            finally {
                if (!this.cancelStreamRegistry.unregisterCloseable(fsDataInputStream)) continue;
                IOUtils.closeQuietly((InputStream)fsDataInputStream);
            }
        }
        return null;
    }

    private void readStateHandleStateData(FSDataInputStream fsDataInputStream, DataInputViewStreamWrapper inView, KeyGroupRangeOffsets keyGroupOffsets, Map<Integer, StateMetaInfoSnapshot> kvStatesById, int numStates, int readVersion, boolean isCompressed) throws IOException {
        StreamCompressionDecorator streamCompressionDecorator = isCompressed ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
        for (Tuple2<Integer, Long> groupOffset : keyGroupOffsets) {
            int keyGroupIndex = (Integer)groupOffset.f0;
            long offset = (Long)groupOffset.f1;
            if (!this.keyGroupRange.contains(keyGroupIndex)) {
                LOG.debug("Key group {} doesn't belong to this backend with key group range: {}", (Object)keyGroupIndex, (Object)this.keyGroupRange);
                continue;
            }
            fsDataInputStream.seek(offset);
            int writtenKeyGroupIndex = inView.readInt();
            Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex, "Unexpected key-group in restore.");
            InputStream kgCompressionInStream = streamCompressionDecorator.decorateWithCompression(fsDataInputStream);
            try {
                this.readKeyGroupStateData(kgCompressionInStream, kvStatesById, keyGroupIndex, numStates, readVersion);
            }
            finally {
                if (kgCompressionInStream == null) continue;
                kgCompressionInStream.close();
            }
        }
    }

    private void readKeyGroupStateData(InputStream inputStream, Map<Integer, StateMetaInfoSnapshot> kvStatesById, int keyGroupIndex, int numStates, int readVersion) throws IOException {
        DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inputStream);
        for (int i = 0; i < numStates; ++i) {
            StateSnapshotRestore registeredState;
            short kvStateId = inView.readShort();
            StateMetaInfoSnapshot stateMetaInfoSnapshot = kvStatesById.get(kvStateId);
            switch (stateMetaInfoSnapshot.getBackendStateType()) {
                case KEY_VALUE: {
                    registeredState = this.registeredKVStates.get(stateMetaInfoSnapshot.getName());
                    break;
                }
                case PRIORITY_QUEUE: {
                    registeredState = this.registeredPQStates.get(stateMetaInfoSnapshot.getName());
                    break;
                }
                default: {
                    throw new IllegalStateException("Unexpected state type: " + stateMetaInfoSnapshot.getBackendStateType() + ".");
                }
            }
            StateSnapshotKeyGroupReader keyGroupReader = registeredState.keyGroupReader(readVersion);
            keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex);
        }
    }
}

