/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.memory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.memory.AbstractMemState;
import org.apache.flink.runtime.state.memory.AbstractMemStateSnapshot;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;

public class MemReducingState<K, N, V>
extends AbstractMemState<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>>
implements ReducingState<V> {
    private final ReduceFunction<V> reduceFunction;

    public MemReducingState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<V> stateDesc) {
        super(keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc);
        this.reduceFunction = stateDesc.getReduceFunction();
    }

    public MemReducingState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<V> stateDesc, HashMap<N, Map<K, V>> state) {
        super(keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state);
        this.reduceFunction = stateDesc.getReduceFunction();
    }

    public V get() {
        if (this.currentNSState == null) {
            this.currentNSState = (Map)this.state.get(this.currentNamespace);
        }
        if (this.currentNSState != null) {
            return this.currentNSState.get(this.currentKey);
        }
        return null;
    }

    public void add(V value) throws IOException {
        Object currentValue;
        if (this.currentKey == null) {
            throw new RuntimeException("No key available.");
        }
        if (this.currentNSState == null) {
            this.currentNSState = new HashMap();
            this.state.put(this.currentNamespace, this.currentNSState);
        }
        if ((currentValue = this.currentNSState.get(this.currentKey)) == null) {
            this.currentNSState.put(this.currentKey, value);
        } else {
            try {
                this.currentNSState.put(this.currentKey, this.reduceFunction.reduce(currentValue, value));
            }
            catch (Exception e) {
                throw new RuntimeException("Could not add value to reducing state.", e);
            }
        }
    }

    @Override
    public KvStateSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, MemoryStateBackend> createHeapSnapshot(byte[] bytes) {
        return new Snapshot(this.getKeySerializer(), this.getNamespaceSerializer(), this.stateSerializer, (ReducingStateDescriptor)this.stateDesc, bytes);
    }

    public static class Snapshot<K, N, V>
    extends AbstractMemStateSnapshot<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> {
        private static final long serialVersionUID = 1L;

        public Snapshot(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, TypeSerializer<V> stateSerializer, ReducingStateDescriptor<V> stateDescs, byte[] data) {
            super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data);
        }

        @Override
        public KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, MemoryStateBackend> createMemState(HashMap<N, Map<K, V>> stateMap) {
            return new MemReducingState<K, N, V>(this.keySerializer, this.namespaceSerializer, (ReducingStateDescriptor)this.stateDesc, stateMap);
        }
    }
}

