/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.function.grpc;

import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.grpc.GrpcSpringMessage;
import org.springframework.cloud.function.grpc.MessagingServiceGrpc;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public final class GrpcUtils {
    private static Log logger = LogFactory.getLog(GrpcUtils.class);

    private GrpcUtils() {
    }

    public static GrpcSpringMessage toGrpcSpringMessage(byte[] payload, Map<String, String> headers) {
        return GrpcSpringMessage.newBuilder().setPayload(ByteString.copyFrom((byte[])payload)).putAllHeaders(headers).build();
    }

    public static GrpcSpringMessage toGrpcSpringMessage(Message<byte[]> message) {
        HashMap<String, String> stringHeaders = new HashMap<String, String>();
        message.getHeaders().forEach((k, v) -> stringHeaders.put((String)k, v.toString()));
        return GrpcUtils.toGrpcSpringMessage((byte[])message.getPayload(), stringHeaders);
    }

    public static Message<byte[]> fromGrpcSpringMessage(GrpcSpringMessage message) {
        return MessageBuilder.withPayload((Object)message.getPayload().toByteArray()).copyHeaders(message.getHeadersMap()).build();
    }

    public static Message<byte[]> requestReply(Message<byte[]> inputMessage) {
        return GrpcUtils.requestReply("localhost", 6048, inputMessage);
    }

    public static Message<byte[]> requestReply(String host, int port, Message<byte[]> inputMessage) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress((String)host, (int)port).usePlaintext().build();
        try {
            MessagingServiceGrpc.MessagingServiceBlockingStub stub = MessagingServiceGrpc.newBlockingStub((Channel)channel);
            try {
                GrpcSpringMessage response = stub.requestReply(GrpcUtils.toGrpcSpringMessage(inputMessage));
                Message<byte[]> message = GrpcUtils.fromGrpcSpringMessage(response);
                return message;
            }
            catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }
        finally {
            channel.shutdownNow();
        }
    }

    public static Flux<Message<byte[]>> biStreaming(Flux<Message<byte[]>> inputStream) {
        return GrpcUtils.biStreaming("localhost", 6048, inputStream);
    }

    public static Flux<Message<byte[]>> biStreaming(String host, int port, Flux<Message<byte[]>> inputStream) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress((String)host, (int)port).usePlaintext().build();
        MessagingServiceGrpc.MessagingServiceStub stub = MessagingServiceGrpc.newStub((Channel)channel);
        Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer();
        ClientResponseObserver<GrpcSpringMessage, GrpcSpringMessage> clientResponseObserver = GrpcUtils.clientResponseObserver(inputStream, (Sinks.Many<Message<byte[]>>)sink);
        stub.biStream((StreamObserver<GrpcSpringMessage>)clientResponseObserver);
        return sink.asFlux().doOnComplete(() -> {
            logger.debug((Object)"Shutting down channel");
            channel.shutdownNow();
        }).doOnError(e -> {
            e.printStackTrace();
            channel.shutdownNow();
        });
    }

    public static Flux<Message<byte[]>> serverStream(String host, int port, Message<byte[]> inputMessage) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress((String)host, (int)port).usePlaintext().build();
        MessagingServiceGrpc.MessagingServiceBlockingStub stub = MessagingServiceGrpc.newBlockingStub((Channel)channel);
        Iterator<GrpcSpringMessage> serverStream = stub.serverStream(GrpcUtils.toGrpcSpringMessage(inputMessage));
        Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> {
            while (serverStream.hasNext()) {
                GrpcSpringMessage grpcMessage = (GrpcSpringMessage)serverStream.next();
                sink.tryEmitNext(GrpcUtils.fromGrpcSpringMessage(grpcMessage));
            }
            sink.tryEmitComplete();
        });
        return sink.asFlux().doOnComplete(() -> {
            channel.shutdownNow();
            executor.shutdownNow();
        }).doOnError(e -> {
            e.printStackTrace();
            channel.shutdownNow();
            executor.shutdownNow();
        });
    }

    public static Message<byte[]> clientStream(Flux<Message<byte[]>> inputStream) {
        return GrpcUtils.clientStream("localhost", 6048, inputStream);
    }

    public static Message<byte[]> clientStream(String host, int port, Flux<Message<byte[]>> inputStream) {
        final ManagedChannel channel = ManagedChannelBuilder.forAddress((String)host, (int)port).usePlaintext().build();
        final LinkedBlockingQueue resultRef = new LinkedBlockingQueue(1);
        StreamObserver<GrpcSpringMessage> responseObserver = new StreamObserver<GrpcSpringMessage>(){

            public void onNext(GrpcSpringMessage result) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Client received reply: " + String.valueOf(result)));
                }
                resultRef.offer(GrpcUtils.fromGrpcSpringMessage(result));
            }

            public void onError(Throwable t) {
                t.printStackTrace();
                channel.shutdownNow();
            }

            public void onCompleted() {
                logger.info((Object)"Client completed");
                channel.shutdownNow();
            }
        };
        MessagingServiceGrpc.MessagingServiceStub asyncStub = MessagingServiceGrpc.newStub((Channel)channel);
        StreamObserver<GrpcSpringMessage> requestObserver = asyncStub.clientStream(responseObserver);
        inputStream.doOnNext(message -> {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Client sending: " + String.valueOf(message)));
            }
            try {
                requestObserver.onNext((Object)GrpcUtils.toGrpcSpringMessage((Message<byte[]>)message));
            }
            catch (Exception e) {
                requestObserver.onError((Throwable)e);
            }
        }).doOnComplete(() -> requestObserver.onCompleted()).doOnError(arg_0 -> GrpcUtils.lambda$clientStream$2((StreamObserver)responseObserver, arg_0)).subscribe();
        try {
            return (Message)resultRef.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(ie);
        }
    }

    private static ClientResponseObserver<GrpcSpringMessage, GrpcSpringMessage> clientResponseObserver(final Flux<Message<byte[]>> inputStream, final Sinks.Many<Message<byte[]>> sink) {
        return new ClientResponseObserver<GrpcSpringMessage, GrpcSpringMessage>(){
            ClientCallStreamObserver<GrpcSpringMessage> requestStreamObserver;

            public void beforeStart(final ClientCallStreamObserver<GrpcSpringMessage> requestStreamObserver) {
                this.requestStreamObserver = requestStreamObserver;
                requestStreamObserver.disableAutoInboundFlowControl();
                requestStreamObserver.setOnReadyHandler(new Runnable(){
                    final /* synthetic */ 2 this$0;
                    {
                        this.this$0 = this$0;
                    }

                    @Override
                    public void run() {
                        inputStream.doOnNext(request -> {
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)("Streaming message to function: " + String.valueOf(request)));
                            }
                            requestStreamObserver.onNext((Object)GrpcUtils.toGrpcSpringMessage((Message<byte[]>)request));
                        }).doOnComplete(() -> requestStreamObserver.onCompleted()).subscribe();
                    }
                });
            }

            public void onNext(GrpcSpringMessage message) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Streaming message from function: " + String.valueOf(message)));
                }
                sink.tryEmitNext(GrpcUtils.fromGrpcSpringMessage(message));
                this.requestStreamObserver.request(1);
            }

            public void onError(Throwable t) {
                t.printStackTrace();
            }

            public void onCompleted() {
                logger.info((Object)"Client stream is complete");
                sink.tryEmitComplete();
            }
        };
    }

    private static /* synthetic */ void lambda$clientStream$2(StreamObserver responseObserver, Throwable e) {
        e.printStackTrace();
        responseObserver.onError((Throwable)Status.UNKNOWN.withDescription("Error handling request").withCause(e).asRuntimeException());
    }
}

