/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.connector.source.util.ratelimit;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.Preconditions;

@Experimental
public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
implements SourceReader<E, SplitT> {
    private final SourceReader<E, SplitT> sourceReader;
    private final RateLimiter<SplitT> rateLimiter;
    private CompletableFuture<Void> availabilityFuture = null;

    public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, RateLimiter<SplitT> rateLimiter) {
        Preconditions.checkNotNull(sourceReader);
        Preconditions.checkNotNull(rateLimiter);
        this.sourceReader = sourceReader;
        this.rateLimiter = rateLimiter;
    }

    @Override
    public void start() {
        this.sourceReader.start();
    }

    @Override
    public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
        if (this.availabilityFuture == null) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        this.availabilityFuture = null;
        InputStatus inputStatus = this.sourceReader.pollNext(output);
        if (inputStatus == InputStatus.MORE_AVAILABLE) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        return inputStatus;
    }

    @Override
    public CompletableFuture<Void> isAvailable() {
        if (this.availabilityFuture == null) {
            this.availabilityFuture = this.rateLimiter.acquire().toCompletableFuture().thenCombine(this.sourceReader.isAvailable(), (l, r) -> null);
        }
        return this.availabilityFuture;
    }

    @Override
    public void addSplits(List<SplitT> splits) {
        this.sourceReader.addSplits(splits);
    }

    @Override
    public void notifyNoMoreSplits() {
        this.sourceReader.notifyNoMoreSplits();
    }

    @Override
    public List<SplitT> snapshotState(long checkpointId) {
        return this.sourceReader.snapshotState(checkpointId);
    }

    @Override
    public void close() throws Exception {
        this.sourceReader.close();
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.rateLimiter.notifyCheckpointComplete(checkpointId);
    }
}

