/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.core.state.InternalStateFuture;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;

public abstract class AbstractStateIterator<T>
implements StateIterator<T> {
    final State originalState;
    final StateRequestType requestType;
    final StateRequestHandler stateHandler;
    final Collection<T> cache;

    public AbstractStateIterator(State originalState, StateRequestType requestType, StateRequestHandler stateHandler, Collection<T> partialResult) {
        this.originalState = originalState;
        this.requestType = requestType;
        this.stateHandler = stateHandler;
        this.cache = partialResult;
    }

    protected abstract boolean hasNext();

    protected abstract Object nextPayloadForContinuousLoading();

    protected StateRequestType getRequestType() {
        return this.requestType;
    }

    private InternalStateFuture<StateIterator<T>> asyncNextLoad() {
        return this.stateHandler.handleRequest(this.originalState, StateRequestType.ITERATOR_LOADING, this.nextPayloadForContinuousLoading());
    }

    @Override
    public <U> StateFuture<Collection<U>> onNext(Function<T, StateFuture<? extends U>> iterating) {
        if (this.isEmpty()) {
            return StateFutureUtils.completedFuture(Collections.emptyList());
        }
        ArrayList<StateFuture<? extends U>> resultFutures = new ArrayList<StateFuture<? extends U>>();
        for (T item : this.cache) {
            resultFutures.add(iterating.apply(item));
        }
        if (this.hasNext()) {
            return StateFutureUtils.combineAll(resultFutures).thenCombine(this.asyncNextLoad().thenCompose(itr -> itr.onNext(iterating)), (a, b) -> {
                ArrayList result = new ArrayList(a.size() + b.size());
                result.addAll(a);
                result.addAll(b);
                return result;
            });
        }
        return StateFutureUtils.combineAll(resultFutures);
    }

    @Override
    public StateFuture<Void> onNext(Consumer<T> iterating) {
        if (this.isEmpty()) {
            return StateFutureUtils.completedVoidFuture();
        }
        for (T item : this.cache) {
            iterating.accept(item);
        }
        if (this.hasNext()) {
            return this.asyncNextLoad().thenCompose(itr -> itr.onNext(iterating));
        }
        return StateFutureUtils.completedVoidFuture();
    }

    @Override
    public boolean isEmpty() {
        return (this.cache == null || this.cache.isEmpty()) && !this.hasNext();
    }
}

