/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.gcp.data.firestore.util;

import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public final class ObservableReactiveUtil {
    private ObservableReactiveUtil() {
    }

    public static <ResponseT> Mono<ResponseT> unaryCall(Consumer<StreamObserver<ResponseT>> remoteCall) {
        return Mono.create(sink -> remoteCall.accept(new UnaryStreamObserver((MonoSink)sink)));
    }

    public static <ResponseT> Flux<ResponseT> streamingCall(Consumer<StreamObserver<ResponseT>> remoteCall) {
        return Flux.create(sink -> {
            StreamingObserver observer = new StreamingObserver(sink);
            remoteCall.accept((StreamObserver)observer);
            sink.onRequest(demand -> observer.request(demand));
        });
    }

    private static class UnaryStreamObserver<ResponseT>
    implements StreamObserver<ResponseT> {
        private boolean terminalEventReceived;
        private final MonoSink sink;

        UnaryStreamObserver(MonoSink sink) {
            this.sink = sink;
        }

        public void onNext(ResponseT response) {
            this.terminalEventReceived = true;
            this.sink.success(response);
        }

        public void onError(Throwable throwable) {
            this.terminalEventReceived = true;
            this.sink.error(throwable);
        }

        public void onCompleted() {
            if (!this.terminalEventReceived) {
                this.sink.error((Throwable)new RuntimeException("Unary gRPC call completed without yielding a value or an error"));
            }
        }
    }

    static class StreamingObserver<RequestT, ResponseT>
    implements ClientResponseObserver<RequestT, ResponseT> {
        ClientCallStreamObserver<RequestT> rsObserver;
        FluxSink<ResponseT> sink;

        StreamingObserver(FluxSink<ResponseT> sink) {
            this.sink = sink;
        }

        public void onNext(ResponseT value) {
            this.sink.next(value);
        }

        public void onError(Throwable throwable) {
            this.sink.error(throwable);
        }

        public void onCompleted() {
            this.sink.complete();
        }

        public void beforeStart(ClientCallStreamObserver<RequestT> requestStream) {
            this.rsObserver = requestStream;
            requestStream.disableAutoInboundFlowControl();
            this.sink.onCancel(() -> requestStream.cancel("Flux requested cancel.", null));
        }

        void request(long n) {
            this.rsObserver.request(n > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)n);
        }
    }
}

