/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigtable;

import com.google.bigtable.admin.v2.GetTableRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.Row;
import com.google.bigtable.v2.RowFilter;
import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.RowSet;
import com.google.bigtable.v2.SampleRowKeysRequest;
import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.grpc.BigtableSession;
import com.google.cloud.bigtable.grpc.BigtableTableName;
import com.google.cloud.bigtable.grpc.async.BulkMutation;
import com.google.cloud.bigtable.grpc.scanner.FlatRow;
import com.google.cloud.bigtable.grpc.scanner.FlatRowConverter;
import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
import com.google.cloud.bigtable.grpc.scanner.ScanHandler;
import com.google.cloud.bigtable.util.ByteStringComparator;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableService;
import org.apache.beam.sdk.io.gcp.bigtable.VendoredListenableFutureAdapter;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closer;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.FutureCallback;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.Pure;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BigtableServiceImpl
implements BigtableService {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(BigtableServiceImpl.class);
    private static final @UnknownKeyFor @NonNull @Initialized double DEFAULT_BYTE_LIMIT_PERCENTAGE = 0.1;
    private static final @UnknownKeyFor @NonNull @Initialized double WATERMARK_PERCENTAGE = 0.1;
    private static final @UnknownKeyFor @NonNull @Initialized long MIN_BYTE_BUFFER_SIZE = 0x6400000L;
    private final @UnknownKeyFor @NonNull @Initialized BigtableOptions options;

    public BigtableServiceImpl(@UnknownKeyFor @NonNull @Initialized BigtableOptions options) {
        this.options = options;
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized BigtableOptions getBigtableOptions() {
        return this.options;
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized BigtableWriterImpl openForWriting(@UnknownKeyFor @NonNull @Initialized String tableId) throws @UnknownKeyFor @NonNull @Initialized IOException {
        BigtableSession session = new BigtableSession(this.options);
        BigtableTableName tableName = this.options.getInstanceName().toTableName(tableId);
        return new BigtableWriterImpl(session, tableName);
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized boolean tableExists(@UnknownKeyFor @NonNull @Initialized String tableId) throws @UnknownKeyFor @NonNull @Initialized IOException {
        boolean bl;
        BigtableSession session = new BigtableSession(this.options);
        try {
            GetTableRequest getTable = GetTableRequest.newBuilder().setName(this.options.getInstanceName().toTableNameStr(tableId)).build();
            session.getTableAdminClient().getTable(getTable);
            bl = true;
        }
        catch (Throwable getTable) {
            try {
                try {
                    session.close();
                }
                catch (Throwable throwable) {
                    getTable.addSuppressed(throwable);
                }
                throw getTable;
            }
            catch (StatusRuntimeException e) {
                if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
                    return false;
                }
                String message = String.format("Error checking whether table %s (BigtableOptions %s) exists", tableId, this.options);
                LOG.error(message, (Throwable)e);
                throw new IOException(message, e);
            }
        }
        session.close();
        return bl;
    }

    @SideEffectFree
    public @UnknownKeyFor @NonNull @Initialized String toString() {
        return MoreObjects.toStringHelper(BigtableServiceImpl.class).add("options", (Object)this.options).toString();
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized BigtableService.Reader createReader(@UnknownKeyFor @NonNull @Initialized BigtableIO.BigtableSource source) throws @UnknownKeyFor @NonNull @Initialized IOException {
        BigtableSession session = new BigtableSession(this.options);
        if (source.getMaxBufferElementCount() != null) {
            return BigtableSegmentReaderImpl.create(session, source);
        }
        return new BigtableReaderImpl(session, source);
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized SampleRowKeysResponse> getSampleRowKeys(@UnknownKeyFor @NonNull @Initialized BigtableIO.BigtableSource source) throws @UnknownKeyFor @NonNull @Initialized IOException {
        try (BigtableSession session = new BigtableSession(this.options);){
            SampleRowKeysRequest request = SampleRowKeysRequest.newBuilder().setTableName(this.options.getInstanceName().toTableNameStr((String)source.getTableId().get())).build();
            List list = session.getDataClient().sampleRowKeys(request);
            return list;
        }
    }

    @VisibleForTesting
    public static @UnknownKeyFor @NonNull @Initialized ServiceCallMetric createCallMetric(@UnknownKeyFor @NonNull @Initialized BigtableSession session, @UnknownKeyFor @NonNull @Initialized String tableId) {
        HashMap<String, String> baseLabels = new HashMap<String, String>();
        baseLabels.put("PTRANSFORM", "");
        baseLabels.put("SERVICE", "BigTable");
        baseLabels.put("METHOD", "google.bigtable.v2.ReadRows");
        baseLabels.put("RESOURCE", GcpResourceIdentifiers.bigtableResource((String)session.getOptions().getProjectId(), (String)session.getOptions().getInstanceId(), (String)tableId));
        baseLabels.put("BIGTABLE_PROJECT_ID", session.getOptions().getProjectId());
        baseLabels.put("INSTANCE_ID", session.getOptions().getInstanceId());
        baseLabels.put("TABLE_ID", GcpResourceIdentifiers.bigtableTableID((String)session.getOptions().getProjectId(), (String)session.getOptions().getInstanceId(), (String)tableId));
        return new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
    }

    private static final class EndPoint
    implements Comparable<EndPoint> {
        private final @UnknownKeyFor @NonNull @Initialized ByteString value;
        private final @UnknownKeyFor @NonNull @Initialized boolean isClosed;

        @Nonnull
        static @UnknownKeyFor @NonNull @Initialized EndPoint extract(@Nonnull @UnknownKeyFor @NonNull @Initialized RowRange rowRange) {
            switch (rowRange.getEndKeyCase()) {
                case ENDKEY_NOT_SET: {
                    return new EndPoint(ByteString.EMPTY, true);
                }
                case END_KEY_CLOSED: {
                    return new EndPoint(rowRange.getEndKeyClosed(), true);
                }
                case END_KEY_OPEN: {
                    if (rowRange.getEndKeyOpen().isEmpty()) {
                        return new EndPoint(ByteString.EMPTY, true);
                    }
                    return new EndPoint(rowRange.getEndKeyOpen(), false);
                }
            }
            throw new IllegalArgumentException("Unknown endKeyCase: " + rowRange.getEndKeyCase());
        }

        private EndPoint(@Nonnull @UnknownKeyFor @NonNull @Initialized ByteString value, @UnknownKeyFor @NonNull @Initialized boolean isClosed) {
            this.value = value;
            this.isClosed = isClosed;
        }

        @Override
        @Pure
        public @UnknownKeyFor @NonNull @Initialized int compareTo(@Nonnull @UnknownKeyFor @NonNull @Initialized EndPoint o) {
            return ComparisonChain.start().compareFalseFirst(this.value.isEmpty(), o.value.isEmpty()).compare((Object)this.value, (Object)o.value, (Comparator)ByteStringComparator.INSTANCE).compareFalseFirst(this.isClosed, o.isClosed).result();
        }
    }

    private static final class StartPoint
    implements Comparable<StartPoint> {
        private final @UnknownKeyFor @NonNull @Initialized ByteString value;
        private final @UnknownKeyFor @NonNull @Initialized boolean isClosed;

        @Nonnull
        static @UnknownKeyFor @NonNull @Initialized StartPoint extract(@Nonnull @UnknownKeyFor @NonNull @Initialized RowRange rowRange) {
            switch (rowRange.getStartKeyCase()) {
                case STARTKEY_NOT_SET: {
                    return new StartPoint(ByteString.EMPTY, true);
                }
                case START_KEY_CLOSED: {
                    return new StartPoint(rowRange.getStartKeyClosed(), true);
                }
                case START_KEY_OPEN: {
                    if (rowRange.getStartKeyOpen().isEmpty()) {
                        return new StartPoint(ByteString.EMPTY, true);
                    }
                    return new StartPoint(rowRange.getStartKeyOpen(), false);
                }
            }
            throw new IllegalArgumentException("Unknown startKeyCase: " + rowRange.getStartKeyCase());
        }

        private StartPoint(@Nonnull @UnknownKeyFor @NonNull @Initialized ByteString value, @UnknownKeyFor @NonNull @Initialized boolean isClosed) {
            this.value = value;
            this.isClosed = isClosed;
        }

        @Override
        @Pure
        public @UnknownKeyFor @NonNull @Initialized int compareTo(@Nonnull @UnknownKeyFor @NonNull @Initialized StartPoint o) {
            return ComparisonChain.start().compareTrueFirst(this.value.isEmpty(), o.value.isEmpty()).compare((Object)this.value, (Object)o.value, (Comparator)ByteStringComparator.INSTANCE).compareTrueFirst(this.isClosed, o.isClosed).result();
        }
    }

    @VisibleForTesting
    static class BigtableWriterImpl
    implements BigtableService.Writer {
        private @UnknownKeyFor @NonNull @Initialized BigtableSession session;
        private @UnknownKeyFor @NonNull @Initialized BulkMutation bulkMutation;
        private @UnknownKeyFor @NonNull @Initialized BigtableTableName tableName;

        BigtableWriterImpl(@UnknownKeyFor @NonNull @Initialized BigtableSession session, @UnknownKeyFor @NonNull @Initialized BigtableTableName tableName) {
            this.session = session;
            this.bulkMutation = session.createBulkMutation(tableName);
            this.tableName = tableName;
        }

        @Override
        public void flush() throws @UnknownKeyFor @NonNull @Initialized IOException {
            if (this.bulkMutation != null) {
                try {
                    this.bulkMutation.flush();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException(e);
                }
            }
        }

        @Override
        public void close() throws @UnknownKeyFor @NonNull @Initialized IOException {
            block6: {
                try {
                    if (this.bulkMutation == null) break block6;
                    try {
                        this.bulkMutation.flush();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new IOException(e);
                    }
                    this.bulkMutation = null;
                }
                finally {
                    if (this.session != null) {
                        this.session.close();
                        this.session = null;
                    }
                }
            }
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized CompletionStage<@UnknownKeyFor @NonNull @Initialized MutateRowResponse> writeRecord(@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized ByteString, @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized Mutation>> record) throws @UnknownKeyFor @NonNull @Initialized IOException {
            MutateRowsRequest.Entry request = MutateRowsRequest.Entry.newBuilder().setRowKey((ByteString)record.getKey()).addAllMutations((Iterable)record.getValue()).build();
            HashMap<String, String> baseLabels = new HashMap<String, String>();
            baseLabels.put("PTRANSFORM", "");
            baseLabels.put("SERVICE", "BigTable");
            baseLabels.put("METHOD", "google.bigtable.v2.MutateRows");
            baseLabels.put("RESOURCE", GcpResourceIdentifiers.bigtableResource((String)this.session.getOptions().getProjectId(), (String)this.session.getOptions().getInstanceId(), (String)this.tableName.getTableId()));
            baseLabels.put("BIGTABLE_PROJECT_ID", this.session.getOptions().getProjectId());
            baseLabels.put("INSTANCE_ID", this.session.getOptions().getInstanceId());
            baseLabels.put("TABLE_ID", GcpResourceIdentifiers.bigtableTableID((String)this.session.getOptions().getProjectId(), (String)this.session.getOptions().getInstanceId(), (String)this.tableName.getTableId()));
            final ServiceCallMetric serviceCallMetric = new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
            final CompletableFuture<MutateRowResponse> result = new CompletableFuture<MutateRowResponse>();
            Futures.addCallback(new VendoredListenableFutureAdapter(this.bulkMutation.add(request)), (FutureCallback)new FutureCallback<MutateRowResponse>(){

                public void onSuccess(@UnknownKeyFor @NonNull @Initialized MutateRowResponse mutateRowResponse) {
                    result.complete(mutateRowResponse);
                    serviceCallMetric.call("ok");
                }

                public void onFailure(@UnknownKeyFor @NonNull @Initialized Throwable throwable) {
                    if (throwable instanceof StatusRuntimeException) {
                        serviceCallMetric.call(((StatusRuntimeException)throwable).getStatus().getCode().value());
                    } else {
                        serviceCallMetric.call("unknown");
                    }
                    result.completeExceptionally(throwable);
                }
            }, (Executor)MoreExecutors.directExecutor());
            return result;
        }
    }

    @VisibleForTesting
    static class BigtableSegmentReaderImpl
    implements BigtableService.Reader {
        private @UnknownKeyFor @NonNull @Initialized BigtableSession session;
        private @Nullable @UnknownKeyFor @Initialized ReadRowsRequest nextRequest;
        private @Nullable @UnknownKeyFor @Initialized Row currentRow;
        private @Nullable @UnknownKeyFor @Initialized Future<@UnknownKeyFor @NonNull @Initialized UpstreamResults> future;
        private final @UnknownKeyFor @NonNull @Initialized Queue<@UnknownKeyFor @NonNull @Initialized Row> buffer;
        private final @UnknownKeyFor @NonNull @Initialized int refillSegmentWaterMark;
        private final @UnknownKeyFor @NonNull @Initialized long maxSegmentByteSize;
        private @UnknownKeyFor @NonNull @Initialized ServiceCallMetric serviceCallMetric;

        static @UnknownKeyFor @NonNull @Initialized BigtableSegmentReaderImpl create(@UnknownKeyFor @NonNull @Initialized BigtableSession session, @UnknownKeyFor @NonNull @Initialized BigtableIO.BigtableSource source) {
            RowSet.Builder rowSetBuilder = RowSet.newBuilder();
            if (source.getRanges().isEmpty()) {
                rowSetBuilder = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance());
            } else {
                for (ByteKeyRange beamRange : source.getRanges()) {
                    RowRange.Builder rangeBuilder = rowSetBuilder.addRowRangesBuilder();
                    rangeBuilder.setStartKeyClosed(ByteString.copyFrom((ByteBuffer)beamRange.getStartKey().getValue())).setEndKeyOpen(ByteString.copyFrom((ByteBuffer)beamRange.getEndKey().getValue()));
                }
            }
            RowSet rowSet = rowSetBuilder.build();
            RowFilter filter = (RowFilter)MoreObjects.firstNonNull((Object)source.getRowFilter(), (Object)RowFilter.getDefaultInstance());
            long maxSegmentByteSize = (long)Math.max(1.048576E8, (double)Runtime.getRuntime().totalMemory() * 0.1);
            return new BigtableSegmentReaderImpl(session, session.getOptions().getInstanceName().toTableNameStr((String)source.getTableId().get()), rowSet, source.getMaxBufferElementCount(), maxSegmentByteSize, filter, BigtableServiceImpl.createCallMetric(session, (String)source.getTableId().get()));
        }

        @VisibleForTesting
        BigtableSegmentReaderImpl(@UnknownKeyFor @NonNull @Initialized BigtableSession session, @UnknownKeyFor @NonNull @Initialized String tableName, @UnknownKeyFor @NonNull @Initialized RowSet rowSet, @UnknownKeyFor @NonNull @Initialized int maxRowsInBuffer, @UnknownKeyFor @NonNull @Initialized long maxSegmentByteSize, @UnknownKeyFor @NonNull @Initialized RowFilter filter, @UnknownKeyFor @NonNull @Initialized ServiceCallMetric serviceCallMetric) {
            if (rowSet.equals((Object)rowSet.getDefaultInstanceForType())) {
                rowSet = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
            }
            ReadRowsRequest request = ReadRowsRequest.newBuilder().setTableName(tableName).setRows(rowSet).setFilter(filter).setRowsLimit((long)maxRowsInBuffer).build();
            this.session = session;
            this.nextRequest = request;
            this.maxSegmentByteSize = maxSegmentByteSize;
            this.serviceCallMetric = serviceCallMetric;
            this.buffer = new ArrayDeque<Row>();
            this.refillSegmentWaterMark = (int)((double)request.getRowsLimit() * 0.1);
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized boolean start() throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.future = this.fetchNextSegment();
            return this.advance();
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized boolean advance() throws @UnknownKeyFor @NonNull @Initialized IOException {
            if (this.buffer.size() < this.refillSegmentWaterMark && this.future == null) {
                this.future = this.fetchNextSegment();
            }
            if (this.buffer.isEmpty() && this.future != null) {
                this.waitReadRowsFuture();
            }
            this.currentRow = this.buffer.poll();
            return this.currentRow != null;
        }

        private @UnknownKeyFor @NonNull @Initialized Future<@UnknownKeyFor @NonNull @Initialized UpstreamResults> fetchNextSegment() {
            final SettableFuture f = SettableFuture.create();
            if (this.nextRequest == null) {
                f.set((Object)new UpstreamResults((List)ImmutableList.of(), null));
                return f;
            }
            final AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<ScanHandler>();
            ScanHandler handler = this.session.getDataClient().readFlatRows(this.nextRequest, (StreamObserver)new StreamObserver<FlatRow>(){
                @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Row> rows = new ArrayList<Row>();
                @UnknownKeyFor @NonNull @Initialized long currentByteSize = 0L;
                @UnknownKeyFor @NonNull @Initialized boolean byteLimitReached = false;

                public void onNext(@UnknownKeyFor @NonNull @Initialized FlatRow flatRow) {
                    Row row = FlatRowConverter.convert((FlatRow)flatRow);
                    this.currentByteSize += (long)row.getSerializedSize();
                    this.rows.add(row);
                    if (this.currentByteSize > maxSegmentByteSize) {
                        this.byteLimitReached = true;
                        ((ScanHandler)atomicScanHandler.get()).cancel();
                        return;
                    }
                }

                public void onError(@UnknownKeyFor @NonNull @Initialized Throwable e) {
                    f.setException(e);
                }

                public void onCompleted() {
                    ReadRowsRequest nextNextRequest = null;
                    if (this.byteLimitReached || (long)this.rows.size() == nextRequest.getRowsLimit()) {
                        nextNextRequest = this.truncateRequest(nextRequest, this.rows.get(this.rows.size() - 1).getKey());
                    }
                    f.set((Object)new UpstreamResults(this.rows, nextNextRequest));
                }
            });
            atomicScanHandler.set(handler);
            return f;
        }

        private void waitReadRowsFuture() throws @UnknownKeyFor @NonNull @Initialized IOException {
            try {
                UpstreamResults r = this.future.get();
                this.buffer.addAll(r.rows);
                this.nextRequest = r.nextRequest;
                this.future = null;
                this.serviceCallMetric.call("ok");
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException(e);
            }
            catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (cause instanceof StatusRuntimeException) {
                    this.serviceCallMetric.call(((StatusRuntimeException)cause).getStatus().getCode().toString());
                }
                throw new IOException(cause);
            }
        }

        private @UnknownKeyFor @NonNull @Initialized ReadRowsRequest truncateRequest(@UnknownKeyFor @NonNull @Initialized ReadRowsRequest request, @UnknownKeyFor @NonNull @Initialized ByteString lastKey) {
            RowSet.Builder segment = RowSet.newBuilder();
            for (RowRange rowRange : request.getRows().getRowRangesList()) {
                int startCmp = StartPoint.extract(rowRange).compareTo(new StartPoint(lastKey, true));
                int endCmp = EndPoint.extract(rowRange).compareTo(new EndPoint(lastKey, true));
                if (startCmp > 0) {
                    segment.addRowRanges(rowRange);
                    continue;
                }
                if (endCmp <= 0) continue;
                RowRange subRange = rowRange.toBuilder().setStartKeyOpen(lastKey).build();
                segment.addRowRanges(subRange);
            }
            if (segment.getRowRangesCount() == 0) {
                return null;
            }
            ReadRowsRequest.Builder requestBuilder = request.toBuilder();
            requestBuilder.clearRows();
            return requestBuilder.setRows(segment).build();
        }

        @Override
        public void close() throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.session.close();
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized Row getCurrentRow() throws @UnknownKeyFor @NonNull @Initialized NoSuchElementException {
            if (this.currentRow == null) {
                throw new NoSuchElementException();
            }
            return this.currentRow;
        }

        private static class UpstreamResults {
            private final @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Row> rows;
            private final @Nullable @UnknownKeyFor @Initialized ReadRowsRequest nextRequest;

            private UpstreamResults(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Row> rows, @Nullable @UnknownKeyFor @Initialized ReadRowsRequest nextRequest) {
                this.rows = rows;
                this.nextRequest = nextRequest;
            }
        }
    }

    @VisibleForTesting
    static class BigtableReaderImpl
    implements BigtableService.Reader {
        private @UnknownKeyFor @NonNull @Initialized BigtableSession session;
        private final @UnknownKeyFor @NonNull @Initialized BigtableIO.BigtableSource source;
        private @UnknownKeyFor @NonNull @Initialized ResultScanner<@UnknownKeyFor @NonNull @Initialized Row> results;
        private @UnknownKeyFor @NonNull @Initialized Row currentRow;

        @VisibleForTesting
        BigtableReaderImpl(@UnknownKeyFor @NonNull @Initialized BigtableSession session, @UnknownKeyFor @NonNull @Initialized BigtableIO.BigtableSource source) {
            this.session = session;
            this.source = source;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized boolean start() throws @UnknownKeyFor @NonNull @Initialized IOException {
            RowSet.Builder rowSetBuilder = RowSet.newBuilder();
            for (ByteKeyRange sourceRange : this.source.getRanges()) {
                rowSetBuilder = rowSetBuilder.addRowRanges(RowRange.newBuilder().setStartKeyClosed(ByteString.copyFrom((ByteBuffer)sourceRange.getStartKey().getValue())).setEndKeyOpen(ByteString.copyFrom((ByteBuffer)sourceRange.getEndKey().getValue())));
            }
            RowSet rowSet = rowSetBuilder.build();
            String tableNameSr = this.session.getOptions().getInstanceName().toTableNameStr((String)this.source.getTableId().get());
            ServiceCallMetric serviceCallMetric = BigtableServiceImpl.createCallMetric(this.session, (String)this.source.getTableId().get());
            ReadRowsRequest.Builder requestB = ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
            if (this.source.getRowFilter() != null) {
                requestB.setFilter(this.source.getRowFilter());
            }
            try {
                this.results = this.session.getDataClient().readRows(requestB.build());
                serviceCallMetric.call("ok");
            }
            catch (StatusRuntimeException e) {
                serviceCallMetric.call(e.getStatus().getCode().toString());
                throw e;
            }
            return this.advance();
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized boolean advance() throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.currentRow = (Row)this.results.next();
            return this.currentRow != null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() throws @UnknownKeyFor @NonNull @Initialized IOException {
            if (this.session == null) {
                return;
            }
            try (Closer closer = Closer.create();){
                if (this.results != null) {
                    closer.register(this.results);
                    this.results = null;
                }
                this.session.close();
            }
            finally {
                this.session = null;
            }
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized Row getCurrentRow() throws @UnknownKeyFor @NonNull @Initialized NoSuchElementException {
            if (this.currentRow == null) {
                throw new NoSuchElementException();
            }
            return this.currentRow;
        }
    }
}

