/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.client.block.stream.DataMessageClientResponseObserver;
import alluxio.client.block.stream.GrpcBlockingStream;
import alluxio.grpc.DataMessage;
import alluxio.grpc.DataMessageMarshaller;
import alluxio.network.protocol.databuffer.DataBuffer;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.function.Function;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
public class GrpcDataMessageBlockingStream<ReqT, ResT>
extends GrpcBlockingStream<ReqT, ResT> {
    private final DataMessageMarshaller<ReqT> mRequestMarshaller;
    private final DataMessageMarshaller<ResT> mResponseMarshaller;

    public GrpcDataMessageBlockingStream(Function<StreamObserver<ResT>, StreamObserver<ReqT>> rpcFunc, int bufferSize, String description, DataMessageMarshaller<ReqT> requestMarshaller, DataMessageMarshaller<ResT> responseMarshaller) {
        super(resObserver -> {
            DataMessageClientResponseObserver newObserver = new DataMessageClientResponseObserver(resObserver, requestMarshaller, responseMarshaller);
            StreamObserver requestObserver = (StreamObserver)rpcFunc.apply((StreamObserver)newObserver);
            return requestObserver;
        }, bufferSize, description);
        this.mRequestMarshaller = requestMarshaller;
        this.mResponseMarshaller = responseMarshaller;
    }

    @Override
    public ResT receive(long timeoutMs) throws IOException {
        if (this.mResponseMarshaller == null) {
            return super.receive(timeoutMs);
        }
        DataMessage<ResT, DataBuffer> message = this.receiveDataMessage(timeoutMs);
        if (message == null) {
            return null;
        }
        return (ResT)this.mResponseMarshaller.combineData(message);
    }

    public DataMessage<ResT, DataBuffer> receiveDataMessage(long timeoutMs) throws IOException {
        Preconditions.checkNotNull(this.mResponseMarshaller, (Object)"Cannot retrieve data message without a response marshaller.");
        Object response = super.receive(timeoutMs);
        if (response == null) {
            return null;
        }
        DataBuffer buffer = this.mResponseMarshaller.pollBuffer(response);
        return new DataMessage(response, (Object)buffer);
    }

    public void sendDataMessage(DataMessage<ReqT, DataBuffer> message, long timeoutMs) throws IOException {
        if (this.mRequestMarshaller != null) {
            this.mRequestMarshaller.offerBuffer((DataBuffer)message.getBuffer(), message.getMessage());
        }
        super.send(message.getMessage(), timeoutMs);
    }

    @Override
    public void waitForComplete(long timeoutMs) throws IOException {
        DataMessage<ResT, DataBuffer> message;
        if (this.mResponseMarshaller == null) {
            super.waitForComplete(timeoutMs);
            return;
        }
        while (!this.isCanceled() && (message = this.receiveDataMessage(timeoutMs)) != null) {
            if (message.getBuffer() == null) continue;
            ((DataBuffer)message.getBuffer()).release();
        }
        super.waitForComplete(timeoutMs);
    }
}

