/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.client.WriteType;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.block.stream.DataWriter;
import alluxio.client.block.stream.GrpcBlockingStream;
import alluxio.client.block.stream.GrpcDataMessageBlockingStream;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.Chunk;
import alluxio.grpc.DataMessage;
import alluxio.grpc.DataMessageMarshaller;
import alluxio.grpc.RequestType;
import alluxio.grpc.WriteRequest;
import alluxio.grpc.WriteRequestCommand;
import alluxio.grpc.WriteRequestMarshaller;
import alluxio.grpc.WriteResponse;
import alluxio.network.protocol.databuffer.NettyDataBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.resource.CloseableResource;
import alluxio.security.authorization.AccessControlList;
import alluxio.util.proto.ProtoUtils;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.protobuf.UnsafeByteOperations;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.ByteBuffer;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class GrpcDataWriter
implements DataWriter {
    private static final Logger LOG = LoggerFactory.getLogger(GrpcDataWriter.class);
    private final int mWriterBufferSizeMessages;
    private final long mDataTimeoutMs;
    private final long mWriterCloseTimeoutMs;
    private final long mWriterFlushTimeoutMs;
    private final FileSystemContext mContext;
    private final CloseableResource<BlockWorkerClient> mClient;
    private final WorkerNetAddress mAddress;
    private final long mLength;
    private final WriteRequestCommand mPartialRequest;
    private final long mChunkSize;
    private final GrpcBlockingStream<WriteRequest, WriteResponse> mStream;
    private final WriteRequestMarshaller mMarshaller;
    private long mPosToQueue;

    public static GrpcDataWriter create(FileSystemContext context, WorkerNetAddress address, long id, long length, RequestType type, OutStreamOptions options) throws IOException {
        long chunkSize = context.getClusterConf().getBytes(PropertyKey.USER_NETWORK_WRITER_CHUNK_SIZE_BYTES);
        CloseableResource<BlockWorkerClient> grpcClient = context.acquireBlockWorkerClient(address);
        try {
            return new GrpcDataWriter(context, address, id, length, chunkSize, type, options, grpcClient);
        }
        catch (Exception e) {
            grpcClient.close();
            throw e;
        }
    }

    private GrpcDataWriter(FileSystemContext context, WorkerNetAddress address, long id, long length, long chunkSize, RequestType type, OutStreamOptions options, CloseableResource<BlockWorkerClient> client) throws IOException {
        boolean possibleToFallback;
        this.mContext = context;
        this.mAddress = address;
        this.mLength = length;
        AlluxioConfiguration conf = context.getClusterConf();
        this.mDataTimeoutMs = conf.getMs(PropertyKey.USER_NETWORK_DATA_TIMEOUT_MS);
        this.mWriterBufferSizeMessages = conf.getInt(PropertyKey.USER_NETWORK_WRITER_BUFFER_SIZE_MESSAGES);
        this.mWriterCloseTimeoutMs = conf.getMs(PropertyKey.USER_NETWORK_WRITER_CLOSE_TIMEOUT_MS);
        this.mWriterFlushTimeoutMs = conf.getMs(PropertyKey.USER_NETWORK_WRITER_FLUSH_TIMEOUT);
        WriteRequestCommand.Builder builder = WriteRequestCommand.newBuilder().setId(id).setTier(options.getWriteTier()).setType(type).setMediumType(options.getMediumType());
        if (type == RequestType.UFS_FILE) {
            Protocol.CreateUfsFileOptions ufsFileOptions = Protocol.CreateUfsFileOptions.newBuilder().setUfsPath(options.getUfsPath()).setOwner(options.getOwner()).setGroup(options.getGroup()).setMode((int)options.getMode().toShort()).setMountId(options.getMountId()).setAcl(ProtoUtils.toProto((AccessControlList)options.getAcl())).build();
            builder.setCreateUfsFileOptions(ufsFileOptions);
        }
        boolean alreadyFallback = type == RequestType.UFS_FALLBACK_BLOCK;
        boolean bl = possibleToFallback = type == RequestType.ALLUXIO_BLOCK && options.getWriteType() == WriteType.ASYNC_THROUGH && conf.getBoolean(PropertyKey.USER_FILE_UFS_TIER_ENABLED);
        if (alreadyFallback || possibleToFallback) {
            builder.setType(RequestType.UFS_FALLBACK_BLOCK);
            Protocol.CreateUfsBlockOptions ufsBlockOptions = Protocol.CreateUfsBlockOptions.newBuilder().setMountId(options.getMountId()).setFallback(alreadyFallback).build();
            builder.setCreateUfsBlockOptions(ufsBlockOptions);
        }
        builder.setPinOnCreate(options.getWriteType() == WriteType.ASYNC_THROUGH);
        this.mPartialRequest = builder.buildPartial();
        this.mChunkSize = chunkSize;
        this.mClient = client;
        this.mMarshaller = new WriteRequestMarshaller();
        this.mStream = conf.getBoolean(PropertyKey.USER_NETWORK_ZEROCOPY_ENABLED) ? new GrpcDataMessageBlockingStream<WriteRequest, WriteResponse>(((BlockWorkerClient)this.mClient.get())::writeBlock, this.mWriterBufferSizeMessages, MoreObjects.toStringHelper((Object)this).add("request", (Object)this.mPartialRequest).add("address", (Object)address).toString(), (DataMessageMarshaller<WriteRequest>)this.mMarshaller, (DataMessageMarshaller<WriteResponse>)null) : new GrpcBlockingStream(((BlockWorkerClient)this.mClient.get())::writeBlock, this.mWriterBufferSizeMessages, MoreObjects.toStringHelper((Object)this).add("request", (Object)this.mPartialRequest).add("address", (Object)address).toString());
        this.mStream.send(WriteRequest.newBuilder().setCommand(this.mPartialRequest.toBuilder()).build(), this.mDataTimeoutMs);
    }

    @Override
    public long pos() {
        return this.mPosToQueue;
    }

    @Override
    public void writeChunk(ByteBuf buf) throws IOException {
        this.mPosToQueue += (long)buf.readableBytes();
        try {
            WriteRequest request = WriteRequest.newBuilder().setCommand(this.mPartialRequest).setChunk(Chunk.newBuilder().setData(UnsafeByteOperations.unsafeWrap((ByteBuffer)buf.nioBuffer())).build()).build();
            if (this.mStream instanceof GrpcDataMessageBlockingStream) {
                ((GrpcDataMessageBlockingStream)this.mStream).sendDataMessage(new DataMessage((Object)request, (Object)new NettyDataBuffer(buf)), this.mDataTimeoutMs);
            } else {
                this.mStream.send(request, this.mDataTimeoutMs);
            }
        }
        finally {
            buf.release();
        }
    }

    public void writeFallbackInitRequest(long pos) throws IOException {
        Preconditions.checkState((this.mPartialRequest.getType() == RequestType.UFS_FALLBACK_BLOCK ? 1 : 0) != 0);
        Protocol.CreateUfsBlockOptions ufsBlockOptions = this.mPartialRequest.getCreateUfsBlockOptions().toBuilder().setBytesInBlockStore(pos).build();
        WriteRequest writeRequest = WriteRequest.newBuilder().setCommand(this.mPartialRequest.toBuilder().setOffset(0L).setCreateUfsBlockOptions(ufsBlockOptions)).build();
        this.mPosToQueue = pos;
        this.mStream.send(writeRequest, this.mDataTimeoutMs);
    }

    @Override
    public void cancel() {
        if (((BlockWorkerClient)this.mClient.get()).isShutdown()) {
            return;
        }
        this.mStream.cancel();
    }

    @Override
    public void flush() throws IOException {
        WriteResponse response;
        long posWritten;
        if (this.mStream.isClosed() || this.mStream.isCanceled() || this.mPosToQueue == 0L) {
            return;
        }
        WriteRequest writeRequest = WriteRequest.newBuilder().setCommand(this.mPartialRequest.toBuilder().setOffset(this.mPosToQueue).setFlush(true)).build();
        this.mStream.send(writeRequest, this.mDataTimeoutMs);
        do {
            if ((response = this.mStream.receive(this.mWriterFlushTimeoutMs)) != null) continue;
            throw new UnavailableException(String.format("Flush request %s to worker %s is not acked before complete.", writeRequest, this.mAddress));
        } while (this.mPosToQueue != (posWritten = response.getOffset()));
    }

    @Override
    public void close() throws IOException {
        try {
            if (((BlockWorkerClient)this.mClient.get()).isShutdown()) {
                return;
            }
            this.mStream.close();
            this.mStream.waitForComplete(this.mWriterCloseTimeoutMs);
        }
        finally {
            this.mClient.close();
        }
    }

    @Override
    public int chunkSize() {
        return (int)this.mChunkSize;
    }
}

