/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
class AsyncTableResultScanner
implements ResultScanner,
AdvancedScanResultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class);
    private final AsyncTable<AdvancedScanResultConsumer> rawTable;
    private final long maxCacheSize;
    private final Scan scan;
    private final Queue<Result> queue = new ArrayDeque<Result>();
    private ScanMetrics scanMetrics;
    private long cacheSize;
    private boolean closed = false;
    private Throwable error;
    private AdvancedScanResultConsumer.ScanResumer resumer;

    public AsyncTableResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan, long maxCacheSize) {
        this.rawTable = table;
        this.maxCacheSize = maxCacheSize;
        this.scan = scan;
        table.scan(scan, this);
    }

    private void addToCache(Result result) {
        this.queue.add(result);
        this.cacheSize += ConnectionUtils.calcEstimatedSize(result);
    }

    private void stopPrefetch(AdvancedScanResultConsumer.ScanController controller) {
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " stop prefetching when scanning " + this.rawTable.getName() + " as the cache size " + this.cacheSize + " is greater than the maxCacheSize " + this.maxCacheSize);
        }
        this.resumer = controller.suspend();
    }

    @Override
    public synchronized void onNext(Result[] results, AdvancedScanResultConsumer.ScanController controller) {
        assert (results.length > 0);
        if (this.closed) {
            controller.terminate();
            return;
        }
        for (Result result : results) {
            this.addToCache(result);
        }
        this.notifyAll();
        if (this.cacheSize >= this.maxCacheSize) {
            this.stopPrefetch(controller);
        }
    }

    @Override
    public synchronized void onHeartbeat(AdvancedScanResultConsumer.ScanController controller) {
        if (this.closed) {
            controller.terminate();
            return;
        }
        if (this.scan.isNeedCursorResult()) {
            controller.cursor().ifPresent(c -> this.queue.add(Result.createCursorResult(c)));
        }
    }

    @Override
    public synchronized void onError(Throwable error) {
        this.error = error;
        this.notifyAll();
    }

    @Override
    public synchronized void onComplete() {
        this.closed = true;
        this.notifyAll();
    }

    @Override
    public void onScanMetricsCreated(ScanMetrics scanMetrics) {
        this.scanMetrics = scanMetrics;
    }

    private void resumePrefetch() {
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching");
        }
        this.resumer.resume();
        this.resumer = null;
    }

    @Override
    public synchronized Result next() throws IOException {
        while (this.queue.isEmpty()) {
            if (this.closed) {
                return null;
            }
            if (this.error != null) {
                FutureUtils.rethrow(this.error);
            }
            try {
                this.wait();
            }
            catch (InterruptedException e) {
                throw new InterruptedIOException();
            }
        }
        Result result = this.queue.poll();
        if (!result.isCursor()) {
            this.cacheSize -= ConnectionUtils.calcEstimatedSize(result);
            if (this.resumer != null && this.cacheSize <= this.maxCacheSize / 2L) {
                this.resumePrefetch();
            }
        }
        return result;
    }

    @Override
    public synchronized void close() {
        this.closed = true;
        this.queue.clear();
        this.cacheSize = 0L;
        if (this.resumer != null) {
            this.resumePrefetch();
        }
        this.notifyAll();
    }

    @Override
    public boolean renewLease() {
        return false;
    }

    @VisibleForTesting
    synchronized boolean isSuspended() {
        return this.resumer != null;
    }

    @Override
    public ScanMetrics getScanMetrics() {
        return this.scanMetrics;
    }
}

