/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.v2;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.state.v2.AbstractKeyedState;
import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor;
import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState;

public class AbstractAggregatingState<K, N, IN, ACC, OUT>
extends AbstractKeyedState<K, N, ACC>
implements InternalAggregatingState<K, N, IN, ACC, OUT> {
    protected final AggregateFunction<IN, ACC, OUT> aggregateFunction;

    public AbstractAggregatingState(StateRequestHandler stateRequestHandler, AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor) {
        super(stateRequestHandler, stateDescriptor);
        this.aggregateFunction = stateDescriptor.getAggregateFunction();
    }

    protected StateFuture<ACC> asyncGetAccumulator() {
        return this.handleRequest(StateRequestType.AGGREGATING_GET, null);
    }

    public StateFuture<OUT> asyncGet() {
        return this.asyncGetAccumulator().thenApply(acc -> acc == null ? null : this.aggregateFunction.getResult(acc));
    }

    public StateFuture<Void> asyncAdd(IN value) {
        return this.asyncGetAccumulator().thenAccept(acc -> {
            Object safeAcc = acc == null ? this.aggregateFunction.createAccumulator() : acc;
            this.handleRequest(StateRequestType.AGGREGATING_ADD, this.aggregateFunction.add(value, safeAcc));
        });
    }

    public OUT get() {
        return this.handleRequestSync(StateRequestType.AGGREGATING_GET, null);
    }

    public void add(IN value) {
        Object acc = this.handleRequestSync(StateRequestType.AGGREGATING_GET, null);
        try {
            Object newValue = acc == null ? this.aggregateFunction.createAccumulator() : this.aggregateFunction.add(value, acc);
            this.handleRequestSync(StateRequestType.AGGREGATING_ADD, newValue);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.handleRequestSync(StateRequestType.AGGREGATING_ADD, value);
    }

    @Override
    public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return StateFutureUtils.completedVoidFuture();
        }
        ArrayList futures = new ArrayList(sources.size() + 1);
        for (N source : sources) {
            if (source == null) continue;
            this.setCurrentNamespace(source);
            futures.add(this.handleRequest(StateRequestType.AGGREGATING_GET, null));
        }
        this.setCurrentNamespace(target);
        futures.add(this.handleRequest(StateRequestType.AGGREGATING_GET, null));
        return StateFutureUtils.combineAll(futures).thenCompose(values -> {
            ArrayList updateFutures = new ArrayList(sources.size() + 1);
            Object current = null;
            Iterator valueIterator = values.iterator();
            for (Object source : sources) {
                Object value = valueIterator.next();
                if (value == null) continue;
                this.setCurrentNamespace(source);
                updateFutures.add(this.handleRequest(StateRequestType.AGGREGATING_REMOVE, null));
                if (current == null) {
                    current = value;
                    continue;
                }
                current = this.aggregateFunction.merge(current, value);
            }
            Object targetValue = valueIterator.next();
            if (current != null) {
                if (targetValue != null) {
                    current = this.aggregateFunction.merge(current, targetValue);
                }
                this.setCurrentNamespace(target);
                updateFutures.add(this.handleRequest(StateRequestType.AGGREGATING_ADD, current));
            }
            return StateFutureUtils.combineAll(updateFutures).thenAccept(ignores -> {});
        });
    }

    @Override
    public void mergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return;
        }
        try {
            Object current = null;
            for (N source : sources) {
                if (source == null) continue;
                this.setCurrentNamespace(source);
                Object oldValue = this.handleRequestSync(StateRequestType.AGGREGATING_GET, null);
                if (oldValue == null) continue;
                this.handleRequestSync(StateRequestType.AGGREGATING_REMOVE, null);
                if (current != null) {
                    current = this.aggregateFunction.merge(current, oldValue);
                    continue;
                }
                current = oldValue;
            }
            if (current != null) {
                this.setCurrentNamespace(target);
                Object targetValue = this.handleRequestSync(StateRequestType.AGGREGATING_GET, null);
                if (targetValue != null) {
                    current = this.aggregateFunction.merge(current, targetValue);
                }
                this.handleRequestSync(StateRequestType.AGGREGATING_ADD, current);
            }
        }
        catch (Exception e) {
            throw new RuntimeException("merge namespace fail.", e);
        }
    }
}

