package org.springframework.cloud.function.grpc;

import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.grpc.MessagingServiceGrpc;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/springframework/cloud/function/grpc/GrpcUtils.class */
public final class GrpcUtils {
    private static Log logger = LogFactory.getLog(GrpcUtils.class);

    private GrpcUtils() {
    }

    public static GrpcSpringMessage toGrpcSpringMessage(byte[] bArr, Map<String, String> map) {
        return GrpcSpringMessage.newBuilder().setPayload(ByteString.copyFrom(bArr)).putAllHeaders(map).m40build();
    }

    public static GrpcSpringMessage toGrpcSpringMessage(Message<byte[]> message) {
        HashMap hashMap = new HashMap();
        message.getHeaders().forEach((str, obj) -> {
            hashMap.put(str, obj.toString());
        });
        return toGrpcSpringMessage((byte[]) message.getPayload(), hashMap);
    }

    public static Message<byte[]> fromGrpcSpringMessage(GrpcSpringMessage grpcSpringMessage) {
        return MessageBuilder.withPayload(grpcSpringMessage.getPayload().toByteArray()).copyHeaders(grpcSpringMessage.getHeadersMap()).build();
    }

    public static Message<byte[]> requestReply(Message<byte[]> message) {
        return requestReply("localhost", FunctionGrpcProperties.GRPC_PORT, message);
    }

    public static Message<byte[]> requestReply(String str, int i, Message<byte[]> message) {
        ManagedChannel build = ManagedChannelBuilder.forAddress(str, i).usePlaintext().build();
        try {
            Message<byte[]> fromGrpcSpringMessage = fromGrpcSpringMessage(MessagingServiceGrpc.newBlockingStub(build).requestReply(toGrpcSpringMessage(message)));
            build.shutdownNow();
            return fromGrpcSpringMessage;
        } catch (Throwable th) {
            build.shutdownNow();
            throw th;
        }
    }

    public static Flux<Message<byte[]>> biStreaming(Flux<Message<byte[]>> flux) {
        return biStreaming("localhost", FunctionGrpcProperties.GRPC_PORT, flux);
    }

    public static Flux<Message<byte[]>> biStreaming(String str, int i, Flux<Message<byte[]>> flux) {
        ManagedChannel build = ManagedChannelBuilder.forAddress(str, i).usePlaintext().build();
        MessagingServiceGrpc.MessagingServiceStub newStub = MessagingServiceGrpc.newStub(build);
        Sinks.Many onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
        newStub.biStream(clientResponseObserver(flux, onBackpressureBuffer));
        return onBackpressureBuffer.asFlux().doOnComplete(() -> {
            logger.debug("Shutting down channel");
            build.shutdownNow();
        }).doOnError(th -> {
            th.printStackTrace();
            build.shutdownNow();
        });
    }

    public static Flux<Message<byte[]>> serverStream(String str, int i, Message<byte[]> message) {
        ManagedChannel build = ManagedChannelBuilder.forAddress(str, i).usePlaintext().build();
        Iterator<GrpcSpringMessage> serverStream = MessagingServiceGrpc.newBlockingStub(build).serverStream(toGrpcSpringMessage(message));
        Sinks.Many onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.execute(() -> {
            while (serverStream.hasNext()) {
                onBackpressureBuffer.tryEmitNext(fromGrpcSpringMessage((GrpcSpringMessage) serverStream.next()));
            }
            onBackpressureBuffer.tryEmitComplete();
        });
        return onBackpressureBuffer.asFlux().doOnComplete(() -> {
            build.shutdownNow();
            newSingleThreadExecutor.shutdownNow();
        }).doOnError(th -> {
            th.printStackTrace();
            build.shutdownNow();
            newSingleThreadExecutor.shutdownNow();
        });
    }

    public static Message<byte[]> clientStream(Flux<Message<byte[]>> flux) {
        return clientStream("localhost", FunctionGrpcProperties.GRPC_PORT, flux);
    }

    public static Message<byte[]> clientStream(String str, int i, Flux<Message<byte[]>> flux) {
        final ManagedChannel build = ManagedChannelBuilder.forAddress(str, i).usePlaintext().build();
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(1);
        StreamObserver<GrpcSpringMessage> streamObserver = new StreamObserver<GrpcSpringMessage>() { // from class: org.springframework.cloud.function.grpc.GrpcUtils.1
            public void onNext(GrpcSpringMessage grpcSpringMessage) {
                if (GrpcUtils.logger.isDebugEnabled()) {
                    GrpcUtils.logger.debug("Client received reply: " + grpcSpringMessage);
                }
                linkedBlockingQueue.offer(GrpcUtils.fromGrpcSpringMessage(grpcSpringMessage));
            }

            public void onError(Throwable th) {
                th.printStackTrace();
                build.shutdownNow();
            }

            public void onCompleted() {
                GrpcUtils.logger.info("Client completed");
                build.shutdownNow();
            }
        };
        StreamObserver<GrpcSpringMessage> clientStream = MessagingServiceGrpc.newStub(build).clientStream(streamObserver);
        flux.doOnNext(message -> {
            if (logger.isDebugEnabled()) {
                logger.debug("Client sending: " + message);
            }
            try {
                clientStream.onNext(toGrpcSpringMessage(message));
            } catch (Exception e) {
                clientStream.onError(e);
            }
        }).doOnComplete(() -> {
            clientStream.onCompleted();
        }).doOnError(th -> {
            th.printStackTrace();
            streamObserver.onError(Status.UNKNOWN.withDescription("Error handling request").withCause(th).asRuntimeException());
        }).subscribe();
        try {
            return (Message) linkedBlockingQueue.poll(2147483647L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
    }

    private static ClientResponseObserver<GrpcSpringMessage, GrpcSpringMessage> clientResponseObserver(final Flux<Message<byte[]>> flux, final Sinks.Many<Message<byte[]>> many) {
        return new ClientResponseObserver<GrpcSpringMessage, GrpcSpringMessage>() { // from class: org.springframework.cloud.function.grpc.GrpcUtils.2
            ClientCallStreamObserver<GrpcSpringMessage> requestStreamObserver;

            public void beforeStart(final ClientCallStreamObserver<GrpcSpringMessage> clientCallStreamObserver) {
                this.requestStreamObserver = clientCallStreamObserver;
                clientCallStreamObserver.disableAutoInboundFlowControl();
                clientCallStreamObserver.setOnReadyHandler(new Runnable() { // from class: org.springframework.cloud.function.grpc.GrpcUtils.2.1
                    @Override // java.lang.Runnable
                    public void run() {
                        Flux flux2 = flux;
                        ClientCallStreamObserver clientCallStreamObserver2 = clientCallStreamObserver;
                        Flux doOnNext = flux2.doOnNext(message -> {
                            if (GrpcUtils.logger.isDebugEnabled()) {
                                GrpcUtils.logger.debug("Streaming message to function: " + message);
                            }
                            clientCallStreamObserver2.onNext(GrpcUtils.toGrpcSpringMessage(message));
                        });
                        ClientCallStreamObserver clientCallStreamObserver3 = clientCallStreamObserver;
                        doOnNext.doOnComplete(() -> {
                            clientCallStreamObserver3.onCompleted();
                        }).subscribe();
                    }
                });
            }

            public void onNext(GrpcSpringMessage grpcSpringMessage) {
                if (GrpcUtils.logger.isDebugEnabled()) {
                    GrpcUtils.logger.debug("Streaming message from function: " + grpcSpringMessage);
                }
                many.tryEmitNext(GrpcUtils.fromGrpcSpringMessage(grpcSpringMessage));
                this.requestStreamObserver.request(1);
            }

            public void onError(Throwable th) {
                th.printStackTrace();
            }

            public void onCompleted() {
                GrpcUtils.logger.info("Client stream is complete");
                many.tryEmitComplete();
            }
        };
    }
}
