/*
 * Decompiled with CFR 0.152.
 */
package com.salesforce.rxgrpc.stub;

import com.google.common.base.Preconditions;
import com.salesforce.reactivegrpc.common.Function;
import com.salesforce.rxgrpc.stub.RxServerStreamObserverAndPublisher;
import com.salesforce.rxgrpc.stub.RxSubscriberAndServerProducer;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.functions.Consumer;

public final class ServerCalls {
    private ServerCalls() {
    }

    public static <TRequest, TResponse> void oneToOne(TRequest request, final StreamObserver<TResponse> responseObserver, Function<Single<TRequest>, Single<TResponse>> delegate) {
        try {
            Single<TRequest> rxRequest = Single.just(request);
            Single<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
            rxResponse.subscribe(new Consumer<TResponse>(){

                @Override
                public void accept(TResponse value) {
                    if (responseObserver instanceof ServerCallStreamObserver && ((ServerCallStreamObserver)responseObserver).isCancelled()) {
                        return;
                    }
                    responseObserver.onNext(value);
                    responseObserver.onCompleted();
                }
            }, new Consumer<Throwable>(){

                @Override
                public void accept(Throwable throwable) {
                    responseObserver.onError(ServerCalls.prepareError(throwable));
                }
            });
        }
        catch (Throwable throwable) {
            responseObserver.onError(ServerCalls.prepareError(throwable));
        }
    }

    public static <TRequest, TResponse> void oneToMany(TRequest request, StreamObserver<TResponse> responseObserver, Function<Single<TRequest>, Flowable<TResponse>> delegate) {
        try {
            Single<TRequest> rxRequest = Single.just(request);
            Flowable<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
            RxSubscriberAndServerProducer serverProducer = rxResponse.subscribeWith(new RxSubscriberAndServerProducer());
            serverProducer.subscribe((ServerCallStreamObserver)responseObserver);
        }
        catch (Throwable throwable) {
            responseObserver.onError(ServerCalls.prepareError(throwable));
        }
    }

    public static <TRequest, TResponse> StreamObserver<TRequest> manyToOne(final StreamObserver<TResponse> responseObserver, Function<Flowable<TRequest>, Single<TResponse>> delegate) {
        final RxServerStreamObserverAndPublisher streamObserverPublisher = new RxServerStreamObserverAndPublisher((ServerCallStreamObserver)responseObserver, null);
        try {
            Single<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(Flowable.fromPublisher(streamObserverPublisher)));
            rxResponse.subscribe(new Consumer<TResponse>(){

                @Override
                public void accept(TResponse value) {
                    if (!streamObserverPublisher.isCancelled()) {
                        responseObserver.onNext(value);
                        responseObserver.onCompleted();
                    }
                }
            }, new Consumer<Throwable>(){

                @Override
                public void accept(Throwable throwable) {
                    if (!streamObserverPublisher.isCancelled()) {
                        streamObserverPublisher.abortPendingCancel();
                        responseObserver.onError(ServerCalls.prepareError(throwable));
                    }
                }
            });
        }
        catch (Throwable throwable) {
            responseObserver.onError(ServerCalls.prepareError(throwable));
        }
        return streamObserverPublisher;
    }

    public static <TRequest, TResponse> StreamObserver<TRequest> manyToMany(StreamObserver<TResponse> responseObserver, Function<Flowable<TRequest>, Flowable<TResponse>> delegate) {
        RxServerStreamObserverAndPublisher streamObserverPublisher = new RxServerStreamObserverAndPublisher((ServerCallStreamObserver)responseObserver, null);
        try {
            Flowable<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(Flowable.fromPublisher(streamObserverPublisher)));
            RxSubscriberAndServerProducer subscriber = new RxSubscriberAndServerProducer();
            subscriber.subscribe((ServerCallStreamObserver)responseObserver);
            rxResponse.subscribe(subscriber);
        }
        catch (Throwable throwable) {
            responseObserver.onError(ServerCalls.prepareError(throwable));
        }
        return streamObserverPublisher;
    }

    private static Throwable prepareError(Throwable throwable) {
        if (throwable instanceof StatusException || throwable instanceof StatusRuntimeException) {
            return throwable;
        }
        return Status.fromThrowable(throwable).asException();
    }
}

