/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.v2;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.state.v2.AbstractKeyedState;
import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState;

public class AbstractAggregatingState<K, N, IN, ACC, OUT>
extends AbstractKeyedState<K, N, ACC>
implements InternalAggregatingState<K, N, IN, ACC, OUT> {
    protected final AggregateFunction<IN, ACC, OUT> aggregateFunction;

    public AbstractAggregatingState(StateRequestHandler stateRequestHandler, AggregateFunction<IN, ACC, OUT> aggregateFunction, TypeSerializer<ACC> valueSerializer) {
        super(stateRequestHandler, valueSerializer);
        this.aggregateFunction = aggregateFunction;
    }

    @Override
    public StateFuture<OUT> asyncGet() {
        return this.asyncGetInternal().thenApply(acc -> acc == null ? null : this.aggregateFunction.getResult(acc));
    }

    @Override
    public StateFuture<Void> asyncAdd(IN value) {
        return this.asyncGetInternal().thenCompose(acc -> {
            Object safeAcc = acc == null ? this.aggregateFunction.createAccumulator() : acc;
            return this.asyncUpdateInternal((ACC)this.aggregateFunction.add(value, safeAcc));
        });
    }

    @Override
    public StateFuture<ACC> asyncGetInternal() {
        return this.handleRequest(StateRequestType.AGGREGATING_GET, null);
    }

    @Override
    public StateFuture<Void> asyncUpdateInternal(ACC valueToStore) {
        return this.handleRequest(StateRequestType.AGGREGATING_ADD, valueToStore);
    }

    @Override
    public OUT get() {
        ACC acc = this.getInternal();
        return acc == null ? null : (OUT)this.aggregateFunction.getResult(acc);
    }

    @Override
    public void add(IN value) {
        ACC acc = this.getInternal();
        try {
            ACC newValue = acc == null ? this.aggregateFunction.add(value, this.aggregateFunction.createAccumulator()) : this.aggregateFunction.add(value, acc);
            this.updateInternal(newValue);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return StateFutureUtils.completedVoidFuture();
        }
        ArrayList<StateFuture<ACC>> futures = new ArrayList<StateFuture<ACC>>(sources.size() + 1);
        for (N source : sources) {
            if (source == null) continue;
            this.setCurrentNamespace(source);
            futures.add(this.asyncGetInternal());
        }
        this.setCurrentNamespace(target);
        futures.add(this.asyncGetInternal());
        return StateFutureUtils.combineAll(futures).thenCompose(values -> {
            ArrayList<StateFuture<Void>> updateFutures = new ArrayList<StateFuture<Void>>(sources.size() + 1);
            Object current = null;
            Iterator valueIterator = values.iterator();
            for (Object source : sources) {
                Object value = valueIterator.next();
                if (value == null) continue;
                this.setCurrentNamespace(source);
                updateFutures.add(this.asyncUpdateInternal((ACC)null));
                if (current == null) {
                    current = value;
                    continue;
                }
                current = this.aggregateFunction.merge(current, value);
            }
            Object targetValue = valueIterator.next();
            if (current != null) {
                if (targetValue != null) {
                    current = this.aggregateFunction.merge(current, targetValue);
                }
                this.setCurrentNamespace(target);
                updateFutures.add(this.asyncUpdateInternal((ACC)current));
            }
            return StateFutureUtils.combineAll(updateFutures).thenAccept(ignores -> {});
        });
    }

    @Override
    public void mergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return;
        }
        try {
            Object current = null;
            for (N source : sources) {
                if (source == null) continue;
                this.setCurrentNamespace(source);
                Object oldValue = this.handleRequestSync(StateRequestType.AGGREGATING_GET, null);
                if (oldValue == null) continue;
                this.handleRequestSync(StateRequestType.AGGREGATING_ADD, null);
                if (current != null) {
                    current = this.aggregateFunction.merge(current, oldValue);
                    continue;
                }
                current = oldValue;
            }
            if (current != null) {
                this.setCurrentNamespace(target);
                Object targetValue = this.handleRequestSync(StateRequestType.AGGREGATING_GET, null);
                if (targetValue != null) {
                    current = this.aggregateFunction.merge(current, targetValue);
                }
                this.handleRequestSync(StateRequestType.AGGREGATING_ADD, current);
            }
        }
        catch (Exception e) {
            throw new RuntimeException("merge namespace fail.", e);
        }
    }

    @Override
    public ACC getInternal() {
        return (ACC)this.handleRequestSync(StateRequestType.AGGREGATING_GET, null);
    }

    @Override
    public void updateInternal(ACC valueToStore) {
        this.handleRequestSync(StateRequestType.AGGREGATING_ADD, valueToStore);
    }
}

