/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog.restore;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoReader;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
import org.apache.flink.state.changelog.ChangelogState;
import org.apache.flink.state.changelog.StateChangeOperation;
import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
import org.apache.flink.state.changelog.restore.ChangelogApplierFactoryImpl;
import org.apache.flink.state.changelog.restore.ChangelogRestoreTarget;
import org.apache.flink.state.changelog.restore.FunctionDelegationHelper;
import org.apache.flink.state.changelog.restore.StateChangeApplier;
import org.apache.flink.state.changelog.restore.StateID;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ChangelogBackendLogApplier {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogBackendLogApplier.class);

    public static void apply(StateChange stateChange, ChangelogRestoreTarget<?> changelogRestoreTarget, ClassLoader classLoader, Map<Short, StateID> stateIds) throws Exception {
        DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)new ByteArrayInputStream(stateChange.getChange()));
        ChangelogBackendLogApplier.applyOperation(StateChangeOperation.byCode(in.readByte()), stateChange.getKeyGroup(), changelogRestoreTarget, (DataInputView)in, classLoader, ChangelogApplierFactoryImpl.INSTANCE, stateIds);
    }

    private static void applyOperation(StateChangeOperation operation, int keyGroup, ChangelogRestoreTarget<?> changelogRestoreTarget, DataInputView in, ClassLoader classLoader, ChangelogApplierFactory factory, Map<Short, StateID> stateIds) throws Exception {
        LOG.debug("apply {} in key group {}", (Object)operation, (Object)keyGroup);
        if (operation == StateChangeOperation.METADATA) {
            ChangelogBackendLogApplier.applyMetaDataChange(in, changelogRestoreTarget, classLoader, stateIds);
        } else if (changelogRestoreTarget.getKeyGroupRange().contains(keyGroup)) {
            ChangelogBackendLogApplier.applyDataChange(in, factory, changelogRestoreTarget, operation, stateIds);
        }
    }

    private static void applyMetaDataChange(DataInputView in, ChangelogRestoreTarget<?> changelogRestoreTarget, ClassLoader classLoader, Map<Short, StateID> stateIds) throws Exception {
        RegisteredPriorityQueueStateBackendMetaInfo meta;
        StateMetaInfoSnapshot snapshot = ChangelogBackendLogApplier.readStateMetaInfoSnapshot(in, classLoader);
        switch (snapshot.getBackendStateType()) {
            case KEY_VALUE: {
                meta = ChangelogBackendLogApplier.restoreKvMetaData(changelogRestoreTarget, snapshot, in);
                break;
            }
            case PRIORITY_QUEUE: {
                meta = ChangelogBackendLogApplier.restorePqMetaData(changelogRestoreTarget, snapshot);
                break;
            }
            default: {
                throw new RuntimeException("Unsupported state type: " + String.valueOf(snapshot.getBackendStateType()) + ", sate: " + snapshot.getName());
            }
        }
        stateIds.put(in.readShort(), new StateID(meta.getName(), StateMetaInfoSnapshot.BackendStateType.byCode((int)in.readByte())));
    }

    private static StateTtlConfig readTtlConfig(DataInputView in) throws IOException {
        if (in.readBoolean()) {
            StateTtlConfig stateTtlConfig;
            ObjectInputStream objectInputStream = new ObjectInputStream((InputStream)new DataInputViewStream(in));
            try {
                stateTtlConfig = (StateTtlConfig)objectInputStream.readObject();
            }
            catch (Throwable throwable) {
                try {
                    try {
                        objectInputStream.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                    throw throwable;
                }
                catch (ClassNotFoundException e) {
                    throw new IOException(e);
                }
            }
            objectInputStream.close();
            return stateTtlConfig;
        }
        return StateTtlConfig.DISABLED;
    }

    @Nullable
    private static Object readDefaultValue(DataInputView in, RegisteredKeyValueStateBackendMetaInfo meta) throws IOException {
        return in.readBoolean() ? meta.getStateSerializer().deserialize(in) : null;
    }

    private static RegisteredKeyValueStateBackendMetaInfo restoreKvMetaData(ChangelogRestoreTarget<?> changelogRestoreTarget, StateMetaInfoSnapshot snapshot, DataInputView in) throws Exception {
        RegisteredKeyValueStateBackendMetaInfo meta = new RegisteredKeyValueStateBackendMetaInfo(snapshot);
        StateTtlConfig ttlConfig = ChangelogBackendLogApplier.readTtlConfig(in);
        Object defaultValue = ChangelogBackendLogApplier.readDefaultValue(in, meta);
        StateDescriptor stateDescriptor = ChangelogBackendLogApplier.toStateDescriptor(meta, defaultValue);
        if (ttlConfig.isEnabled()) {
            stateDescriptor.enableTimeToLive(ttlConfig);
        }
        changelogRestoreTarget.createKeyedState(meta.getNamespaceSerializer(), stateDescriptor);
        return meta;
    }

    private static StateDescriptor toStateDescriptor(RegisteredKeyValueStateBackendMetaInfo meta, @Nullable Object defaultValue) {
        switch (meta.getStateType()) {
            case VALUE: {
                return new ValueStateDescriptor(meta.getName(), meta.getStateSerializer(), defaultValue);
            }
            case MAP: {
                MapSerializer mapSerializer = (MapSerializer)meta.getStateSerializer();
                return new MapStateDescriptor(meta.getName(), mapSerializer.getKeySerializer(), mapSerializer.getValueSerializer());
            }
            case LIST: {
                return new ListStateDescriptor(meta.getName(), ((ListSerializer)meta.getStateSerializer()).getElementSerializer());
            }
            case AGGREGATING: {
                return new AggregatingStateDescriptor(meta.getName(), FunctionDelegationHelper.delegateAggregateFunction(), meta.getStateSerializer());
            }
            case REDUCING: {
                return new ReducingStateDescriptor(meta.getName(), FunctionDelegationHelper.delegateReduceFunction(), meta.getStateSerializer());
            }
        }
        throw new IllegalArgumentException(meta.getStateType().toString());
    }

    private static RegisteredPriorityQueueStateBackendMetaInfo restorePqMetaData(ChangelogRestoreTarget<?> changelogRestoreTarget, StateMetaInfoSnapshot snapshot) {
        RegisteredPriorityQueueStateBackendMetaInfo meta = new RegisteredPriorityQueueStateBackendMetaInfo(snapshot);
        changelogRestoreTarget.createPqState(meta.getName(), meta.getElementSerializer());
        return meta;
    }

    private static StateMetaInfoSnapshot readStateMetaInfoSnapshot(DataInputView in, ClassLoader classLoader) throws IOException {
        int version = in.readInt();
        StateMetaInfoReader reader = StateMetaInfoSnapshotReadersWriters.getReader((int)version);
        return reader.readStateMetaInfoSnapshot(in, classLoader);
    }

    private static void applyDataChange(DataInputView in, ChangelogApplierFactory factory, ChangelogRestoreTarget<?> changelogRestoreTarget, StateChangeOperation operation, Map<Short, StateID> stateIds) throws Exception {
        StateID id = (StateID)Preconditions.checkNotNull((Object)stateIds.get(in.readShort()));
        ChangelogState state = changelogRestoreTarget.getExistingState(id.stateName, id.stateType);
        Preconditions.checkState((state != null ? 1 : 0) != 0, (Object)String.format("%s state %s not found", id.stateType, id.stateName));
        StateChangeApplier changeApplier = state.getChangeApplier(factory);
        changeApplier.apply(operation, in);
    }

    private ChangelogBackendLogApplier() {
    }
}

