/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.sync;

import java.io.IOException;
import java.util.Collection;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.state.forst.sync.AbstractForStSyncAppendingState;
import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend;
import org.forstdb.ColumnFamilyHandle;
import org.forstdb.RocksDBException;

class ForStSyncAggregatingState<K, N, T, ACC, R>
extends AbstractForStSyncAppendingState<K, N, T, ACC, R>
implements InternalAggregatingState<K, N, T, ACC, R> {
    private AggregateFunction<T, ACC, R> aggFunction;

    private ForStSyncAggregatingState(ColumnFamilyHandle columnFamily, TypeSerializer<N> namespaceSerializer, TypeSerializer<ACC> valueSerializer, ACC defaultValue, AggregateFunction<T, ACC, R> aggFunction, ForStSyncKeyedStateBackend<K> backend) {
        super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
        this.aggFunction = aggFunction;
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.backend.getKeySerializer();
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    public TypeSerializer<ACC> getValueSerializer() {
        return this.valueSerializer;
    }

    public R get() throws IOException, RocksDBException {
        Object accumulator = this.getInternal();
        if (accumulator == null) {
            return null;
        }
        return (R)this.aggFunction.getResult(accumulator);
    }

    public void add(T value) throws IOException, RocksDBException {
        byte[] key = this.getKeyBytes();
        Object accumulator = this.getInternal(key);
        accumulator = accumulator == null ? this.aggFunction.createAccumulator() : accumulator;
        this.updateInternal(key, this.aggFunction.add(value, accumulator));
    }

    public void mergeNamespaces(N target, Collection<N> sources) throws IOException, RocksDBException {
        if (sources == null || sources.isEmpty()) {
            return;
        }
        Object current = null;
        for (N source : sources) {
            if (source == null) continue;
            this.setCurrentNamespace(source);
            byte[] sourceKey = this.serializeCurrentKeyWithGroupAndNamespace();
            byte[] valueBytes = this.backend.db.get(this.columnFamily, sourceKey);
            if (valueBytes == null) continue;
            this.backend.db.delete(this.columnFamily, this.writeOptions, sourceKey);
            this.dataInputView.setBuffer(valueBytes);
            Object value = this.valueSerializer.deserialize((DataInputView)this.dataInputView);
            if (current != null) {
                current = this.aggFunction.merge(current, value);
                continue;
            }
            current = value;
        }
        if (current != null) {
            this.setCurrentNamespace(target);
            byte[] targetKey = this.serializeCurrentKeyWithGroupAndNamespace();
            byte[] targetValueBytes = this.backend.db.get(this.columnFamily, targetKey);
            if (targetValueBytes != null) {
                this.dataInputView.setBuffer(targetValueBytes);
                Object value = this.valueSerializer.deserialize((DataInputView)this.dataInputView);
                current = this.aggFunction.merge(current, value);
            }
            this.dataOutputView.clear();
            this.valueSerializer.serialize(current, (DataOutputView)this.dataOutputView);
            this.backend.db.put(this.columnFamily, this.writeOptions, targetKey, this.dataOutputView.getCopyOfBuffer());
        }
    }

    ForStSyncAggregatingState<K, N, T, ACC, R> setAggFunction(AggregateFunction<T, ACC, R> aggFunction) {
        this.aggFunction = aggFunction;
        return this;
    }

    static <K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult, ForStSyncKeyedStateBackend<K> backend) {
        return (IS)new ForStSyncAggregatingState((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer(), stateDesc.getDefaultValue(), ((AggregatingStateDescriptor)stateDesc).getAggregateFunction(), backend);
    }

    static <K, N, SV, S extends State, IS extends S> IS update(StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult, IS existingState) {
        return (IS)((ForStSyncAggregatingState)existingState).setAggFunction(((AggregatingStateDescriptor)stateDesc).getAggregateFunction()).setNamespaceSerializer(((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getNamespaceSerializer()).setValueSerializer(((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer()).setDefaultValue(stateDesc.getDefaultValue());
    }
}

