/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.grpc.client;

import io.grpc.stub.StreamObserver;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.grpc.GrpcConfiguration;
import org.apache.camel.component.grpc.GrpcUtils;
import org.apache.camel.component.grpc.client.GrpcExchangeForwarder;

class GrpcStreamingExchangeForwarder
implements GrpcExchangeForwarder {
    private final GrpcConfiguration configuration;
    private final Object grpcStub;
    private final Lock lock = new ReentrantLock();
    private volatile StreamObserver<Object> currentStream;
    private volatile StreamObserver<Object> currentResponseObserver;

    public GrpcStreamingExchangeForwarder(GrpcConfiguration configuration, Object grpcStub) {
        this.configuration = configuration;
        this.grpcStub = grpcStub;
    }

    @Override
    public boolean forward(Exchange exchange, StreamObserver<Object> responseObserver, AsyncCallback callback) {
        Message message = exchange.getIn();
        StreamObserver<Object> streamObserver = this.checkAndRecreateStreamObserver(responseObserver);
        if (message.getHeaders().containsKey("CamelGrpcEventType")) {
            switch ((String)message.getHeader("CamelGrpcEventType", String.class)) {
                case "onNext": {
                    streamObserver.onNext(message.getBody());
                    break;
                }
                case "onError": {
                    streamObserver.onError((Throwable)message.getBody());
                    break;
                }
                case "onCompleted": {
                    streamObserver.onCompleted();
                    break;
                }
            }
        } else {
            streamObserver.onNext(message.getBody());
        }
        callback.done(true);
        return true;
    }

    @Override
    public void forward(Exchange exchange) {
        throw new UnsupportedOperationException("Synchronous call is not supported in streaming mode");
    }

    @Override
    public void shutdown() {
        if (this.currentResponseObserver != null) {
            this.checkAndRecreateStreamObserver(this.currentResponseObserver).onCompleted();
        }
        this.doCloseStream();
    }

    private StreamObserver<Object> checkAndRecreateStreamObserver(StreamObserver<Object> responseObserver) {
        StreamObserver<Object> curResponseObserver;
        StreamObserver<Object> curStream = this.currentStream;
        if (curStream == null) {
            this.lock.lock();
            try {
                if (this.currentStream == null) {
                    this.currentResponseObserver = responseObserver;
                    this.currentStream = this.doCreateStream(responseObserver);
                }
                curStream = this.currentStream;
            }
            finally {
                this.lock.unlock();
            }
        }
        if ((curResponseObserver = this.currentResponseObserver) != null && !curResponseObserver.equals(responseObserver)) {
            throw new IllegalArgumentException("This forwarder must always use the same response observer");
        }
        return curStream;
    }

    private void doCloseStream() {
        this.lock.lock();
        try {
            this.currentStream = null;
            this.currentResponseObserver = null;
        }
        finally {
            this.lock.unlock();
        }
    }

    private StreamObserver<Object> doCreateStream(final StreamObserver<Object> streamObserver) {
        return GrpcUtils.invokeAsyncMethodStreaming(this.grpcStub, this.configuration.getMethod(), new StreamObserver<Object>(){

            public void onNext(Object o) {
                streamObserver.onNext(o);
            }

            public void onError(Throwable throwable) {
                GrpcStreamingExchangeForwarder.this.doCloseStream();
                streamObserver.onError(throwable);
            }

            public void onCompleted() {
                GrpcStreamingExchangeForwarder.this.doCloseStream();
                streamObserver.onCompleted();
            }
        });
    }
}

