/*
 * Decompiled with CFR 0.152.
 */
package com.salesforce.rxgrpc.stub;

import com.google.common.base.Preconditions;
import com.salesforce.reactivegrpc.common.Function;
import com.salesforce.rxgrpc.stub.RxCallOptions;
import com.salesforce.rxgrpc.stub.RxServerStreamObserverAndPublisher;
import com.salesforce.rxgrpc.stub.RxSubscriberAndServerProducer;
import io.grpc.CallOptions;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.functions.Consumer;

public final class ServerCalls {
    private ServerCalls() {
    }

    public static <TRequest, TResponse> void oneToOne(TRequest request, final StreamObserver<TResponse> responseObserver, Function<TRequest, Single<TResponse>> delegate, final Function<Throwable, Throwable> prepareError) {
        try {
            Single rxResponse = (Single)Preconditions.checkNotNull(delegate.apply(request));
            rxResponse.subscribe(new Consumer<TResponse>(){

                @Override
                public void accept(TResponse value) {
                    if (responseObserver instanceof ServerCallStreamObserver && ((ServerCallStreamObserver)responseObserver).isCancelled()) {
                        return;
                    }
                    responseObserver.onNext(value);
                    responseObserver.onCompleted();
                }
            }, (Consumer<Throwable>)new Consumer<Throwable>(){

                @Override
                public void accept(Throwable throwable) {
                    responseObserver.onError((Throwable)prepareError.apply(throwable));
                }
            });
        }
        catch (Throwable throwable) {
            responseObserver.onError(prepareError.apply(throwable));
        }
    }

    public static <TRequest, TResponse> void oneToMany(TRequest request, StreamObserver<TResponse> responseObserver, Function<TRequest, Flowable<TResponse>> delegate, Function<Throwable, Throwable> prepareError) {
        try {
            Flowable rxResponse = (Flowable)Preconditions.checkNotNull(delegate.apply(request));
            RxSubscriberAndServerProducer serverProducer = rxResponse.subscribeWith(new RxSubscriberAndServerProducer(prepareError::apply));
            serverProducer.subscribe((ServerCallStreamObserver)responseObserver);
        }
        catch (Throwable throwable) {
            responseObserver.onError(prepareError.apply(throwable));
        }
    }

    public static <TRequest, TResponse> StreamObserver<TRequest> manyToOne(final StreamObserver<TResponse> responseObserver, Function<Flowable<TRequest>, Single<TResponse>> delegate, final Function<Throwable, Throwable> prepareError, CallOptions options) {
        int prefetch = RxCallOptions.getPrefetch(options);
        int lowTide = RxCallOptions.getLowTide(options);
        final RxServerStreamObserverAndPublisher streamObserverPublisher = new RxServerStreamObserverAndPublisher((ServerCallStreamObserver)responseObserver, null, prefetch, lowTide);
        try {
            Single rxResponse = (Single)Preconditions.checkNotNull(delegate.apply(Flowable.fromPublisher(streamObserverPublisher)));
            rxResponse.subscribe(new Consumer<TResponse>(){

                @Override
                public void accept(TResponse value) {
                    if (!streamObserverPublisher.isCancelled()) {
                        responseObserver.onNext(value);
                        responseObserver.onCompleted();
                    }
                }
            }, (Consumer<Throwable>)new Consumer<Throwable>(){

                @Override
                public void accept(Throwable throwable) {
                    if (!streamObserverPublisher.isCancelled()) {
                        streamObserverPublisher.abortPendingCancel();
                        responseObserver.onError((Throwable)prepareError.apply(throwable));
                    }
                }
            });
        }
        catch (Throwable throwable) {
            responseObserver.onError(prepareError.apply(throwable));
        }
        return streamObserverPublisher;
    }

    public static <TRequest, TResponse> StreamObserver<TRequest> manyToMany(StreamObserver<TResponse> responseObserver, Function<Flowable<TRequest>, Flowable<TResponse>> delegate, Function<Throwable, Throwable> prepareError, CallOptions options) {
        int prefetch = RxCallOptions.getPrefetch(options);
        int lowTide = RxCallOptions.getLowTide(options);
        RxServerStreamObserverAndPublisher streamObserverPublisher = new RxServerStreamObserverAndPublisher((ServerCallStreamObserver)responseObserver, null, prefetch, lowTide);
        try {
            Flowable rxResponse = (Flowable)Preconditions.checkNotNull(delegate.apply(Flowable.fromPublisher(streamObserverPublisher)));
            RxSubscriberAndServerProducer subscriber = new RxSubscriberAndServerProducer(prepareError::apply);
            subscriber.subscribe((ServerCallStreamObserver)responseObserver);
            rxResponse.subscribe(subscriber);
        }
        catch (Throwable throwable) {
            responseObserver.onError(prepareError.apply(throwable));
        }
        return streamObserverPublisher;
    }

    public static Throwable prepareError(Throwable throwable) {
        if (throwable instanceof StatusException || throwable instanceof StatusRuntimeException) {
            return throwable;
        }
        return Status.fromThrowable((Throwable)throwable).asException();
    }
}

