/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spanner.r2dbc.v2;

import com.google.cloud.spanner.AsyncResultSet;
import com.google.cloud.spanner.r2dbc.v2.SpannerClientLibraryRow;
import reactor.core.publisher.FluxSink;

class ReactiveResultSetCallback
implements AsyncResultSet.ReadyCallback {
    private FluxSink<SpannerClientLibraryRow> sink;
    private AsyncResultSet spannerResultSet;
    private boolean paused = false;

    ReactiveResultSetCallback(FluxSink<SpannerClientLibraryRow> sink, AsyncResultSet resultSet) {
        this.sink = sink;
        this.spannerResultSet = resultSet;
        this.sink.onRequest(this::resumeOnAddedDemand);
        this.sink.onCancel(() -> ((AsyncResultSet)this.spannerResultSet).cancel());
        this.sink.onDispose(() -> ((AsyncResultSet)this.spannerResultSet).close());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AsyncResultSet.CallbackResponse cursorReady(AsyncResultSet resultSet) {
        try {
            ReactiveResultSetCallback reactiveResultSetCallback = this;
            synchronized (reactiveResultSetCallback) {
                if (this.sink.requestedFromDownstream() < 1L && !this.paused) {
                    this.paused = true;
                    return AsyncResultSet.CallbackResponse.PAUSE;
                }
            }
            switch (resultSet.tryNext()) {
                case DONE: {
                    this.sink.complete();
                    return AsyncResultSet.CallbackResponse.DONE;
                }
                case OK: {
                    this.sink.next((Object)new SpannerClientLibraryRow(resultSet.getCurrentRowAsStruct()));
                    return AsyncResultSet.CallbackResponse.CONTINUE;
                }
            }
            return AsyncResultSet.CallbackResponse.CONTINUE;
        }
        catch (Exception t) {
            this.sink.error((Throwable)t);
            return AsyncResultSet.CallbackResponse.DONE;
        }
    }

    synchronized void resumeOnAddedDemand(long request) {
        if (this.paused && this.sink.requestedFromDownstream() > 0L) {
            this.spannerResultSet.resume();
            this.paused = false;
        }
    }
}

