/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.block.stream.DataReader;
import alluxio.client.block.stream.GrpcBlockingStream;
import alluxio.client.block.stream.GrpcDataMessageBlockingStream;
import alluxio.client.file.FileSystemContext;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.DataMessage;
import alluxio.grpc.DataMessageMarshaller;
import alluxio.grpc.ReadRequest;
import alluxio.grpc.ReadResponse;
import alluxio.grpc.ReadResponseMarshaller;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.NioDataBuffer;
import alluxio.resource.CloseableResource;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.nio.ByteBuffer;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class GrpcDataReader
implements DataReader {
    private static final Logger LOG = LoggerFactory.getLogger(GrpcDataReader.class);
    private final int mReaderBufferSizeMessages;
    private final long mDataTimeoutMs;
    private final FileSystemContext mContext;
    private final CloseableResource<BlockWorkerClient> mClient;
    private final ReadRequest mReadRequest;
    private final WorkerNetAddress mAddress;
    private final GrpcBlockingStream<ReadRequest, ReadResponse> mStream;
    private final ReadResponseMarshaller mMarshaller;
    private long mPosToRead;

    private GrpcDataReader(FileSystemContext context, WorkerNetAddress address, ReadRequest readRequest) throws IOException {
        this.mContext = context;
        this.mAddress = address;
        this.mPosToRead = readRequest.getOffset();
        this.mReadRequest = readRequest;
        AlluxioConfiguration alluxioConf = context.getClusterConf();
        this.mReaderBufferSizeMessages = alluxioConf.getInt(PropertyKey.USER_NETWORK_READER_BUFFER_SIZE_MESSAGES);
        this.mDataTimeoutMs = alluxioConf.getMs(PropertyKey.USER_NETWORK_DATA_TIMEOUT_MS);
        this.mMarshaller = new ReadResponseMarshaller();
        this.mClient = this.mContext.acquireBlockWorkerClient(address);
        try {
            if (alluxioConf.getBoolean(PropertyKey.USER_NETWORK_ZEROCOPY_ENABLED)) {
                String desc = "Zero Copy GrpcDataReader";
                if (LOG.isDebugEnabled()) {
                    desc = MoreObjects.toStringHelper((Object)this).add("request", (Object)this.mReadRequest).add("address", (Object)address).toString();
                }
                this.mStream = new GrpcDataMessageBlockingStream<ReadRequest, ReadResponse>(((BlockWorkerClient)this.mClient.get())::readBlock, this.mReaderBufferSizeMessages, desc, (DataMessageMarshaller<ReadRequest>)null, (DataMessageMarshaller<ReadResponse>)this.mMarshaller);
            } else {
                String desc = "GrpcDataReader";
                if (LOG.isDebugEnabled()) {
                    desc = MoreObjects.toStringHelper((Object)this).add("request", (Object)this.mReadRequest).add("address", (Object)address).toString();
                }
                this.mStream = new GrpcBlockingStream(((BlockWorkerClient)this.mClient.get())::readBlock, this.mReaderBufferSizeMessages, desc);
            }
            this.mStream.send(this.mReadRequest, this.mDataTimeoutMs);
        }
        catch (Exception e) {
            this.mClient.close();
            throw e;
        }
    }

    @Override
    public long pos() {
        return this.mPosToRead;
    }

    @Override
    public DataBuffer readChunk() throws IOException {
        Preconditions.checkState((!((BlockWorkerClient)this.mClient.get()).isShutdown() ? 1 : 0) != 0, (Object)"Data reader is closed while reading data chunks.");
        DataBuffer buffer = null;
        ReadResponse response = null;
        if (this.mStream instanceof GrpcDataMessageBlockingStream) {
            DataMessage message = ((GrpcDataMessageBlockingStream)this.mStream).receiveDataMessage(this.mDataTimeoutMs);
            if (message != null) {
                response = (ReadResponse)message.getMessage();
                buffer = (DataBuffer)message.getBuffer();
                if (buffer == null && response.hasChunk() && response.getChunk().hasData()) {
                    ByteBuffer byteBuffer = response.getChunk().getData().asReadOnlyByteBuffer();
                    buffer = new NioDataBuffer(byteBuffer, (long)byteBuffer.remaining());
                }
                Preconditions.checkState((buffer != null ? 1 : 0) != 0, (Object)"response should always contain chunk");
            }
        } else {
            response = this.mStream.receive(this.mDataTimeoutMs);
            if (response != null) {
                Preconditions.checkState((response.hasChunk() && response.getChunk().hasData() ? 1 : 0) != 0, (Object)"response should always contain chunk");
                ByteBuffer byteBuffer = response.getChunk().getData().asReadOnlyByteBuffer();
                buffer = new NioDataBuffer(byteBuffer, (long)byteBuffer.remaining());
            }
        }
        if (response == null) {
            return null;
        }
        this.mPosToRead += (long)buffer.readableBytes();
        try {
            this.mStream.send(this.mReadRequest.toBuilder().setOffsetReceived(this.mPosToRead).build());
        }
        catch (Exception e) {
            LOG.debug("Failed to send receipt of data to worker {} for request {}: {}.", new Object[]{this.mAddress, this.mReadRequest, e.getMessage()});
        }
        Preconditions.checkState((this.mPosToRead - this.mReadRequest.getOffset() <= this.mReadRequest.getLength() ? 1 : 0) != 0);
        return buffer;
    }

    @Override
    public void close() throws IOException {
        try {
            if (((BlockWorkerClient)this.mClient.get()).isShutdown()) {
                return;
            }
            this.mStream.close();
            this.mStream.waitForComplete(this.mDataTimeoutMs);
        }
        finally {
            this.mMarshaller.close();
            this.mClient.close();
        }
    }

    public static class Factory
    implements DataReader.Factory {
        private final FileSystemContext mContext;
        private final WorkerNetAddress mAddress;
        private final ReadRequest mReadRequestPartial;

        public Factory(FileSystemContext context, WorkerNetAddress address, ReadRequest readRequestPartial) {
            this.mContext = context;
            this.mAddress = address;
            this.mReadRequestPartial = readRequestPartial;
        }

        @Override
        public DataReader create(long offset, long len) throws IOException {
            return new GrpcDataReader(this.mContext, this.mAddress, this.mReadRequestPartial.toBuilder().setOffset(offset).setLength(len).build());
        }

        @Override
        public boolean isShortCircuit() {
            return false;
        }

        @Override
        public void close() throws IOException {
        }
    }
}

