/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.heap.AbstractHeapMergingState;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.util.Preconditions;

public class HeapReducingState<K, N, V>
extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>>
implements InternalReducingState<N, V> {
    private final ReduceTransformation<V> reduceTransformation;

    public HeapReducingState(ReducingStateDescriptor<V> stateDesc, StateTable<K, N, V> stateTable, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer) {
        super(stateDesc, stateTable, keySerializer, namespaceSerializer);
        this.reduceTransformation = new ReduceTransformation(stateDesc.getReduceFunction());
    }

    public V get() {
        return (V)this.stateTable.get(this.currentNamespace);
    }

    public void add(V value) throws IOException {
        if (value == null) {
            this.clear();
            return;
        }
        try {
            this.stateTable.transform(this.currentNamespace, value, this.reduceTransformation);
        }
        catch (Exception e) {
            throw new IOException("Exception while applying ReduceFunction in reducing state", e);
        }
    }

    @Override
    protected V mergeState(V a, V b) throws Exception {
        return this.reduceTransformation.apply(a, b);
    }

    static final class ReduceTransformation<V>
    implements StateTransformationFunction<V, V> {
        private final ReduceFunction<V> reduceFunction;

        ReduceTransformation(ReduceFunction<V> reduceFunction) {
            this.reduceFunction = (ReduceFunction)Preconditions.checkNotNull(reduceFunction);
        }

        @Override
        public V apply(V previousState, V value) throws Exception {
            return (V)(previousState != null ? this.reduceFunction.reduce(previousState, value) : value);
        }
    }
}

