/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;

class BlockReceiver
implements Closeable {
    public static final Log LOG = DataNode.LOG;
    static final Log ClientTraceLog = DataNode.ClientTraceLog;
    @VisibleForTesting
    static long CACHE_DROP_LAG_BYTES = 0x800000L;
    private final long datanodeSlowLogThresholdMs;
    private DataInputStream in = null;
    private DataChecksum clientChecksum;
    private DataChecksum diskChecksum;
    private final boolean needsChecksumTranslation;
    private OutputStream out = null;
    private FileDescriptor outFd;
    private DataOutputStream checksumOut = null;
    private final int bytesPerChecksum;
    private final int checksumSize;
    private final PacketReceiver packetReceiver = new PacketReceiver(false);
    protected final String inAddr;
    protected final String myAddr;
    private String mirrorAddr;
    private DataOutputStream mirrorOut;
    private Daemon responder = null;
    private DataTransferThrottler throttler;
    private ReplicaOutputStreams streams;
    private DatanodeInfo srcDataNode = null;
    private final DataNode datanode;
    private volatile boolean mirrorError;
    private boolean dropCacheBehindWrites;
    private long lastCacheManagementOffset = 0L;
    private boolean syncBehindWrites;
    private boolean syncBehindWritesInBackground;
    private final String clientname;
    private final boolean isClient;
    private final boolean isDatanode;
    private final ExtendedBlock block;
    private final ReplicaInPipelineInterface replicaInfo;
    private final BlockConstructionStage stage;
    private final boolean isTransfer;
    private boolean syncOnClose;
    private long restartBudget;
    private ReplicaHandler replicaHandler;
    private final long responseInterval;
    private long lastResponseTime = 0L;
    private boolean isReplaceBlock = false;
    private DataOutputStream replyOut = null;
    private boolean pinning;
    private long lastSentTime;
    private long maxSendIdleTime;

    BlockReceiver(ExtendedBlock block, StorageType storageType, DataInputStream in, String inAddr, String myAddr, BlockConstructionStage stage, long newGs, long minBytesRcvd, long maxBytesRcvd, String clientname, DatanodeInfo srcDataNode, DataNode datanode, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, boolean allowLazyPersist, boolean pinning) throws IOException {
        try {
            this.block = block;
            this.in = in;
            this.inAddr = inAddr;
            this.myAddr = myAddr;
            this.srcDataNode = srcDataNode;
            this.datanode = datanode;
            this.clientname = clientname;
            this.isDatanode = clientname.length() == 0;
            this.isClient = !this.isDatanode;
            this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
            this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
            long readTimeout = datanode.getDnConf().socketTimeout;
            this.responseInterval = (long)((double)readTimeout * 0.5);
            this.stage = stage;
            this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW || stage == BlockConstructionStage.TRANSFER_FINALIZED;
            this.pinning = pinning;
            this.lastSentTime = Time.monotonicNow();
            this.maxSendIdleTime = (long)((double)readTimeout * 0.9);
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.getClass().getSimpleName() + ": " + block + "\n  isClient  =" + this.isClient + ", clientname=" + clientname + "\n  isDatanode=" + this.isDatanode + ", srcDataNode=" + srcDataNode + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr + "\n  cachingStrategy = " + cachingStrategy + "\n  pinning=" + pinning);
            }
            if (this.isDatanode) {
                this.replicaHandler = datanode.data.createTemporary(storageType, block);
            } else {
                switch (stage) {
                    case PIPELINE_SETUP_CREATE: {
                        this.replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
                        datanode.notifyNamenodeReceivingBlock(block, this.replicaHandler.getReplica().getStorageUuid());
                        break;
                    }
                    case PIPELINE_SETUP_STREAMING_RECOVERY: {
                        this.replicaHandler = datanode.data.recoverRbw(block, newGs, minBytesRcvd, maxBytesRcvd);
                        block.setGenerationStamp(newGs);
                        break;
                    }
                    case PIPELINE_SETUP_APPEND: {
                        this.replicaHandler = datanode.data.append(block, newGs, minBytesRcvd);
                        block.setGenerationStamp(newGs);
                        datanode.notifyNamenodeReceivingBlock(block, this.replicaHandler.getReplica().getStorageUuid());
                        break;
                    }
                    case PIPELINE_SETUP_APPEND_RECOVERY: {
                        this.replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
                        block.setGenerationStamp(newGs);
                        datanode.notifyNamenodeReceivingBlock(block, this.replicaHandler.getReplica().getStorageUuid());
                        break;
                    }
                    case TRANSFER_RBW: 
                    case TRANSFER_FINALIZED: {
                        this.replicaHandler = datanode.data.createTemporary(storageType, block);
                        break;
                    }
                    default: {
                        throw new IOException("Unsupported stage " + (Object)((Object)stage) + " while receiving block " + block + " from " + inAddr);
                    }
                }
            }
            this.replicaInfo = this.replicaHandler.getReplica();
            this.dropCacheBehindWrites = cachingStrategy.getDropBehind() == null ? datanode.getDnConf().dropCacheBehindWrites : cachingStrategy.getDropBehind();
            this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
            this.syncBehindWritesInBackground = datanode.getDnConf().syncBehindWritesInBackground;
            boolean isCreate = this.isDatanode || this.isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
            this.streams = this.replicaInfo.createStreams(isCreate, requestedChecksum);
            assert (this.streams != null) : "null streams!";
            this.clientChecksum = requestedChecksum;
            this.diskChecksum = this.streams.getChecksum();
            this.needsChecksumTranslation = !this.clientChecksum.equals(this.diskChecksum);
            this.bytesPerChecksum = this.diskChecksum.getBytesPerChecksum();
            this.checksumSize = this.diskChecksum.getChecksumSize();
            this.out = this.streams.getDataOut();
            if (this.out instanceof FileOutputStream) {
                this.outFd = ((FileOutputStream)this.out).getFD();
            } else {
                LOG.warn("Could not get file descriptor for outputstream of class " + this.out.getClass());
            }
            this.checksumOut = new DataOutputStream(new BufferedOutputStream(this.streams.getChecksumOut(), HdfsConstants.SMALL_BUFFER_SIZE));
            if (isCreate) {
                BlockMetadataHeader.writeHeader(this.checksumOut, this.diskChecksum);
            }
        }
        catch (ReplicaAlreadyExistsException bae) {
            throw bae;
        }
        catch (ReplicaNotFoundException bne) {
            throw bne;
        }
        catch (IOException ioe) {
            IOUtils.closeStream(this);
            this.cleanupBlock();
            IOException cause = DatanodeUtil.getCauseIfDiskError(ioe);
            DataNode.LOG.warn("IOException in BlockReceiver constructor. Cause is ", cause);
            if (cause != null) {
                ioe = cause;
                datanode.checkDiskErrorAsync();
            }
            throw ioe;
        }
    }

    DataNode getDataNode() {
        return this.datanode;
    }

    String getStorageUuid() {
        return this.replicaInfo.getStorageUuid();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        long fsyncStartNanos;
        long flushEndNanos;
        long flushStartNanos;
        this.packetReceiver.close();
        IOException ioe = null;
        if (this.syncOnClose && (this.out != null || this.checksumOut != null)) {
            this.datanode.metrics.incrFsyncCount();
        }
        long flushTotalNanos = 0L;
        boolean measuredFlushTime = false;
        try {
            if (this.checksumOut != null) {
                flushStartNanos = System.nanoTime();
                this.checksumOut.flush();
                flushEndNanos = System.nanoTime();
                if (this.syncOnClose) {
                    fsyncStartNanos = flushEndNanos;
                    this.streams.syncChecksumOut();
                    this.datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
                }
                flushTotalNanos += flushEndNanos - flushStartNanos;
                measuredFlushTime = true;
                this.checksumOut.close();
                this.checksumOut = null;
            }
        }
        catch (IOException e) {
            ioe = e;
        }
        finally {
            IOUtils.closeStream(this.checksumOut);
        }
        try {
            if (this.out != null) {
                flushStartNanos = System.nanoTime();
                this.out.flush();
                flushEndNanos = System.nanoTime();
                if (this.syncOnClose) {
                    fsyncStartNanos = flushEndNanos;
                    this.streams.syncDataOut();
                    this.datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
                }
                flushTotalNanos += flushEndNanos - flushStartNanos;
                measuredFlushTime = true;
                this.out.close();
                this.out = null;
            }
        }
        catch (IOException e) {
            ioe = e;
        }
        finally {
            IOUtils.closeStream(this.out);
        }
        if (this.replicaHandler != null) {
            IOUtils.cleanup(null, this.replicaHandler);
            this.replicaHandler = null;
        }
        if (measuredFlushTime) {
            this.datanode.metrics.addFlushNanos(flushTotalNanos);
        }
        if (ioe != null) {
            this.datanode.checkDiskErrorAsync();
            throw ioe;
        }
    }

    synchronized void setLastSentTime(long sentTime) {
        this.lastSentTime = sentTime;
    }

    synchronized boolean packetSentInTime() {
        long diff = Time.monotonicNow() - this.lastSentTime;
        if (diff > this.maxSendIdleTime) {
            LOG.info("A packet was last sent " + diff + " milliseconds ago.");
            return false;
        }
        return true;
    }

    void flushOrSync(boolean isSync) throws IOException {
        long duration;
        long fsyncStartNanos;
        long flushEndNanos;
        long flushStartNanos;
        long flushTotalNanos = 0L;
        long begin = Time.monotonicNow();
        if (this.checksumOut != null) {
            flushStartNanos = System.nanoTime();
            this.checksumOut.flush();
            flushEndNanos = System.nanoTime();
            if (isSync) {
                fsyncStartNanos = flushEndNanos;
                this.streams.syncChecksumOut();
                this.datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
            }
            flushTotalNanos += flushEndNanos - flushStartNanos;
        }
        if (this.out != null) {
            flushStartNanos = System.nanoTime();
            this.out.flush();
            flushEndNanos = System.nanoTime();
            if (isSync) {
                fsyncStartNanos = flushEndNanos;
                this.streams.syncDataOut();
                this.datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
            }
            flushTotalNanos += flushEndNanos - flushStartNanos;
        }
        if (this.checksumOut != null || this.out != null) {
            this.datanode.metrics.addFlushNanos(flushTotalNanos);
            if (isSync) {
                this.datanode.metrics.incrFsyncCount();
            }
        }
        if ((duration = Time.monotonicNow() - begin) > this.datanodeSlowLogThresholdMs) {
            LOG.warn("Slow flushOrSync took " + duration + "ms (threshold=" + this.datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos=" + flushTotalNanos + "ns");
        }
    }

    private void handleMirrorOutError(IOException ioe) throws IOException {
        String bpid = this.block.getBlockPoolId();
        LOG.info(this.datanode.getDNRegistrationForBP(bpid) + ":Exception writing " + this.block + " to mirror " + this.mirrorAddr, ioe);
        if (Thread.interrupted()) {
            throw ioe;
        }
        this.mirrorError = true;
    }

    private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) throws IOException {
        try {
            this.clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, this.clientname, 0L);
        }
        catch (ChecksumException ce) {
            LOG.warn("Checksum error in block " + this.block + " from " + this.inAddr, ce);
            if (this.srcDataNode != null && this.isDatanode) {
                try {
                    LOG.info("report corrupt " + this.block + " from datanode " + this.srcDataNode + " to namenode");
                    this.datanode.reportRemoteBadBlock(this.srcDataNode, this.block);
                }
                catch (IOException e) {
                    LOG.warn("Failed to report bad " + this.block + " from datanode " + this.srcDataNode + " to namenode");
                }
            }
            throw new IOException("Unexpected checksum mismatch while writing " + this.block + " from " + this.inAddr);
        }
    }

    private void translateChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) {
        this.diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
    }

    private boolean shouldVerifyChecksum() {
        return this.mirrorOut == null || this.isDatanode || this.needsChecksumTranslation;
    }

    private int receivePacket() throws IOException {
        this.packetReceiver.receiveNextPacket(this.in);
        PacketHeader header = this.packetReceiver.getHeader();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Receiving one packet for block " + this.block + ": " + header);
        }
        if (header.getOffsetInBlock() > this.replicaInfo.getNumBytes()) {
            throw new IOException("Received an out-of-sequence packet for " + this.block + "from " + this.inAddr + " at offset " + header.getOffsetInBlock() + ". Expecting packet starting at " + this.replicaInfo.getNumBytes());
        }
        if (header.getDataLen() < 0) {
            throw new IOException("Got wrong length during writeBlock(" + this.block + ") from " + this.inAddr + " at offset " + header.getOffsetInBlock() + ": " + header.getDataLen());
        }
        long offsetInBlock = header.getOffsetInBlock();
        long seqno = header.getSeqno();
        boolean lastPacketInBlock = header.isLastPacketInBlock();
        int len = header.getDataLen();
        boolean syncBlock = header.getSyncBlock();
        if (syncBlock && lastPacketInBlock) {
            this.syncOnClose = false;
        }
        long firstByteInBlock = offsetInBlock;
        if (this.replicaInfo.getNumBytes() < (offsetInBlock += (long)len)) {
            this.replicaInfo.setNumBytes(offsetInBlock);
        }
        if (this.responder != null && !syncBlock && !this.shouldVerifyChecksum()) {
            ((PacketResponder)this.responder.getRunnable()).enqueue(seqno, lastPacketInBlock, offsetInBlock, DataTransferProtos.Status.SUCCESS);
        }
        if (seqno < 0L && len == 0 && DataNodeFaultInjector.get().dropHeartbeatPacket()) {
            return 0;
        }
        if (this.mirrorOut != null && !this.mirrorError) {
            try {
                long begin = Time.monotonicNow();
                this.packetReceiver.mirrorPacketTo(this.mirrorOut);
                this.mirrorOut.flush();
                long now = Time.monotonicNow();
                this.setLastSentTime(now);
                long duration = now - begin;
                if (duration > this.datanodeSlowLogThresholdMs) {
                    LOG.warn("Slow BlockReceiver write packet to mirror took " + duration + "ms (threshold=" + this.datanodeSlowLogThresholdMs + "ms)");
                }
            }
            catch (IOException e) {
                this.handleMirrorOutError(e);
            }
        }
        ByteBuffer dataBuf = this.packetReceiver.getDataSlice();
        ByteBuffer checksumBuf = this.packetReceiver.getChecksumSlice();
        if (lastPacketInBlock || len == 0) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Receiving an empty packet or the end of the block " + this.block);
            }
            if (syncBlock) {
                this.flushOrSync(true);
            }
        } else {
            int checksumLen = this.diskChecksum.getChecksumSize(len);
            int checksumReceivedLen = checksumBuf.capacity();
            if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) {
                throw new IOException("Invalid checksum length: received length is " + checksumReceivedLen + " but expected length is " + checksumLen);
            }
            if (checksumReceivedLen > 0 && this.shouldVerifyChecksum()) {
                try {
                    this.verifyChunks(dataBuf, checksumBuf);
                }
                catch (IOException ioe) {
                    if (this.responder != null) {
                        try {
                            ((PacketResponder)this.responder.getRunnable()).enqueue(seqno, lastPacketInBlock, offsetInBlock, DataTransferProtos.Status.ERROR_CHECKSUM);
                            Thread.sleep(3000L);
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                    }
                    throw new IOException("Terminating due to a checksum error." + ioe);
                }
                if (this.needsChecksumTranslation) {
                    this.translateChunks(dataBuf, checksumBuf);
                }
            }
            if (checksumReceivedLen == 0 && !this.streams.isTransientStorage()) {
                checksumBuf = ByteBuffer.allocate(checksumLen);
                this.diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
            }
            boolean shouldNotWriteChecksum = checksumReceivedLen == 0 && this.streams.isTransientStorage();
            try {
                long onDiskLen = this.replicaInfo.getBytesOnDisk();
                if (onDiskLen < offsetInBlock) {
                    byte[] lastCrc;
                    boolean doCrcRecalc;
                    long partialChunkSizeOnDisk = onDiskLen % (long)this.bytesPerChecksum;
                    long lastChunkBoundary = onDiskLen - partialChunkSizeOnDisk;
                    boolean alignedOnDisk = partialChunkSizeOnDisk == 0L;
                    boolean alignedInPacket = firstByteInBlock % (long)this.bytesPerChecksum == 0L;
                    boolean overwriteLastCrc = !alignedOnDisk && !shouldNotWriteChecksum;
                    boolean bl = doCrcRecalc = overwriteLastCrc && lastChunkBoundary != firstByteInBlock;
                    if (!alignedInPacket && len > this.bytesPerChecksum) {
                        throw new IOException("Unexpected packet data length for " + this.block + " from " + this.inAddr + ": a partial chunk must be " + " sent in an individual packet (data length = " + len + " > bytesPerChecksum = " + this.bytesPerChecksum + ")");
                    }
                    Checksum partialCrc = null;
                    if (doCrcRecalc) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("receivePacket for " + this.block + ": previous write did not end at the chunk boundary." + " onDiskLen=" + onDiskLen);
                        }
                        long offsetInChecksum = (long)BlockMetadataHeader.getHeaderSize() + onDiskLen / (long)this.bytesPerChecksum * (long)this.checksumSize;
                        partialCrc = this.computePartialChunkCrc(onDiskLen, offsetInChecksum);
                    }
                    int startByteToDisk = (int)(onDiskLen - firstByteInBlock) + dataBuf.arrayOffset() + dataBuf.position();
                    int numBytesToDisk = (int)(offsetInBlock - onDiskLen);
                    long begin = Time.monotonicNow();
                    this.out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
                    long duration = Time.monotonicNow() - begin;
                    if (duration > this.datanodeSlowLogThresholdMs) {
                        LOG.warn("Slow BlockReceiver write data to disk cost:" + duration + "ms (threshold=" + this.datanodeSlowLogThresholdMs + "ms)");
                    }
                    if (shouldNotWriteChecksum) {
                        lastCrc = null;
                    } else {
                        int end;
                        int offset;
                        long skippedDataBytes;
                        int skip = 0;
                        byte[] crcBytes = null;
                        if (overwriteLastCrc) {
                            this.adjustCrcFilePosition();
                        }
                        if (doCrcRecalc) {
                            int bytesToReadForRecalc = (int)((long)this.bytesPerChecksum - partialChunkSizeOnDisk);
                            if (numBytesToDisk < bytesToReadForRecalc) {
                                bytesToReadForRecalc = numBytesToDisk;
                            }
                            partialCrc.update(dataBuf.array(), startByteToDisk, bytesToReadForRecalc);
                            byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, this.checksumSize);
                            crcBytes = BlockReceiver.copyLastChunkChecksum(buf, this.checksumSize, buf.length);
                            this.checksumOut.write(buf);
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Writing out partial crc for data len " + len + ", skip=" + skip);
                            }
                            ++skip;
                        }
                        if ((skippedDataBytes = lastChunkBoundary - firstByteInBlock) > 0L) {
                            skip += (int)(skippedDataBytes / (long)this.bytesPerChecksum) + (skippedDataBytes % (long)this.bytesPerChecksum == 0L ? 0 : 1);
                        }
                        if ((offset = checksumBuf.arrayOffset() + checksumBuf.position() + (skip *= this.checksumSize)) >= (end = offset + checksumLen - skip) && doCrcRecalc) {
                            lastCrc = crcBytes;
                        } else {
                            int remainingBytes = checksumLen - skip;
                            lastCrc = BlockReceiver.copyLastChunkChecksum(checksumBuf.array(), this.checksumSize, end);
                            this.checksumOut.write(checksumBuf.array(), offset, remainingBytes);
                        }
                    }
                    this.flushOrSync(syncBlock);
                    this.replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastCrc);
                    this.datanode.metrics.incrBytesWritten(len);
                    this.datanode.metrics.incrTotalWriteTime(duration);
                    this.manageWriterOsCache(offsetInBlock);
                }
            }
            catch (IOException iex) {
                this.datanode.checkDiskErrorAsync();
                throw iex;
            }
        }
        if (this.responder != null && (syncBlock || this.shouldVerifyChecksum())) {
            ((PacketResponder)this.responder.getRunnable()).enqueue(seqno, lastPacketInBlock, offsetInBlock, DataTransferProtos.Status.SUCCESS);
        }
        if (this.isReplaceBlock && Time.monotonicNow() - this.lastResponseTime > this.responseInterval) {
            DataTransferProtos.BlockOpResponseProto.Builder response = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.IN_PROGRESS);
            response.build().writeDelimitedTo(this.replyOut);
            this.replyOut.flush();
            this.lastResponseTime = Time.monotonicNow();
        }
        if (this.throttler != null) {
            this.throttler.throttle(len);
        }
        return lastPacketInBlock ? -1 : len;
    }

    private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
        return Arrays.copyOfRange(array, end - size, end);
    }

    private void manageWriterOsCache(long offsetInBlock) {
        try {
            if (this.outFd != null && offsetInBlock > this.lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
                long dropPos;
                long begin = Time.monotonicNow();
                if (this.syncBehindWrites) {
                    if (this.syncBehindWritesInBackground) {
                        this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest(this.block, this.outFd, this.lastCacheManagementOffset, offsetInBlock - this.lastCacheManagementOffset, 2);
                    } else {
                        NativeIO.POSIX.syncFileRangeIfPossible(this.outFd, this.lastCacheManagementOffset, offsetInBlock - this.lastCacheManagementOffset, 2);
                    }
                }
                if ((dropPos = this.lastCacheManagementOffset - CACHE_DROP_LAG_BYTES) > 0L && this.dropCacheBehindWrites) {
                    NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(this.block.getBlockName(), this.outFd, 0L, dropPos, 4);
                }
                this.lastCacheManagementOffset = offsetInBlock;
                long duration = Time.monotonicNow() - begin;
                if (duration > this.datanodeSlowLogThresholdMs) {
                    LOG.warn("Slow manageWriterOsCache took " + duration + "ms (threshold=" + this.datanodeSlowLogThresholdMs + "ms)");
                }
            }
        }
        catch (Throwable t) {
            LOG.warn("Error managing cache for writer of block " + this.block, t);
        }
    }

    public void sendOOB() throws IOException, InterruptedException {
        ((PacketResponder)this.responder.getRunnable()).sendOOBResponse(PipelineAck.getRestartOOBStatus());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    void receiveBlock(DataOutputStream mirrOut, DataInputStream mirrIn, DataOutputStream replyOut, String mirrAddr, DataTransferThrottler throttlerArg, DatanodeInfo[] downstreams, boolean isReplaceBlock) throws IOException {
        block96: {
            boolean responderClosed;
            block101: {
                block102: {
                    block103: {
                        block90: {
                            this.syncOnClose = this.datanode.getDnConf().syncOnClose;
                            responderClosed = false;
                            this.mirrorOut = mirrOut;
                            this.mirrorAddr = mirrAddr;
                            this.throttler = throttlerArg;
                            this.replyOut = replyOut;
                            this.isReplaceBlock = isReplaceBlock;
                            if (this.isClient && !this.isTransfer) {
                                this.responder = new Daemon(this.datanode.threadGroup, new PacketResponder(replyOut, mirrIn, downstreams));
                                this.responder.start();
                            }
                            while (this.receivePacket() >= 0) {
                            }
                            if (this.responder != null) {
                                ((PacketResponder)this.responder.getRunnable()).close();
                                responderClosed = true;
                            }
                            if (!this.isDatanode && !this.isTransfer) break block90;
                            try (ReplicaHandler handler = this.claimReplicaHandler();){
                                this.close();
                                this.block.setNumBytes(this.replicaInfo.getNumBytes());
                                if (this.stage == BlockConstructionStage.TRANSFER_RBW) {
                                    this.datanode.data.convertTemporaryToRbw(this.block);
                                } else {
                                    this.datanode.data.finalizeBlock(this.block);
                                }
                            }
                            this.datanode.metrics.incrBlocksWritten();
                        }
                        Thread.interrupted();
                        if (responderClosed) break block101;
                        if (this.responder == null) break block102;
                        if (!this.datanode.isRestarting() || !this.isClient || this.isTransfer) break block103;
                        File blockFile = ((ReplicaInPipeline)this.replicaInfo).getBlockFile();
                        File restartMeta = new File(blockFile.getParent() + File.pathSeparator + "." + blockFile.getName() + ".restart");
                        if (restartMeta.exists() && !restartMeta.delete()) {
                            LOG.warn("Failed to delete restart meta file: " + restartMeta.getPath());
                        }
                        try (OutputStreamWriter out2222222222 = new OutputStreamWriter((OutputStream)new FileOutputStream(restartMeta), "UTF-8");){
                            out2222222222.write(Long.toString(Time.now() + this.restartBudget));
                            ((Writer)out2222222222).flush();
                        }
                        catch (IOException out2222222222) {
                            IOUtils.cleanup(LOG, this.out);
                            catch (Throwable throwable) {
                                IOUtils.cleanup(LOG, this.out);
                                throw throwable;
                            }
                        }
                        IOUtils.cleanup(LOG, this.out);
                        try {
                            Thread.sleep(1000L);
                        }
                        catch (InterruptedException out2222222222) {
                            // empty catch block
                        }
                    }
                    this.responder.interrupt();
                }
                IOUtils.closeStream(this);
                this.cleanupBlock();
            }
            if (this.responder != null) {
                block93: {
                    try {
                        this.responder.interrupt();
                        long joinTimeout = this.datanode.getDnConf().getXceiverStopTimeout();
                        joinTimeout = joinTimeout > 1L ? joinTimeout * 8L / 10L : joinTimeout;
                        this.responder.join(joinTimeout);
                        if (this.responder.isAlive()) {
                            String msg = "Join on responder thread " + this.responder + " timed out";
                            LOG.warn(msg + "\n" + StringUtils.getStackTrace(this.responder));
                            throw new IOException(msg);
                        }
                    }
                    catch (InterruptedException e) {
                        this.responder.interrupt();
                        if (this.datanode.isRestarting()) break block93;
                        throw new IOException("Interrupted receiveBlock");
                    }
                }
                this.responder = null;
            }
            break block96;
            catch (IOException ioe) {
                block97: {
                    block108: {
                        block109: {
                            block110: {
                                block95: {
                                    try {
                                        this.replicaInfo.releaseAllBytesReserved();
                                        if (!this.datanode.isRestarting()) {
                                            LOG.info("Exception for " + this.block, ioe);
                                            throw ioe;
                                        }
                                        LOG.info("Shutting down for restart (" + this.block + ").");
                                    }
                                    catch (Throwable throwable) {
                                        block105: {
                                            block106: {
                                                block107: {
                                                    block99: {
                                                        Thread.interrupted();
                                                        if (responderClosed) break block105;
                                                        if (this.responder == null) break block106;
                                                        if (!this.datanode.isRestarting() || !this.isClient || this.isTransfer) break block107;
                                                        File blockFile = ((ReplicaInPipeline)this.replicaInfo).getBlockFile();
                                                        File restartMeta = new File(blockFile.getParent() + File.pathSeparator + "." + blockFile.getName() + ".restart");
                                                        if (restartMeta.exists() && !restartMeta.delete()) {
                                                            LOG.warn("Failed to delete restart meta file: " + restartMeta.getPath());
                                                        }
                                                        try (OutputStreamWriter out3222222222 = new OutputStreamWriter((OutputStream)new FileOutputStream(restartMeta), "UTF-8");){
                                                            out3222222222.write(Long.toString(Time.now() + this.restartBudget));
                                                            ((Writer)out3222222222).flush();
                                                        }
                                                        catch (IOException out3222222222) {
                                                            IOUtils.cleanup(LOG, this.out);
                                                            break block99;
                                                            catch (Throwable throwable2) {
                                                                IOUtils.cleanup(LOG, this.out);
                                                                throw throwable2;
                                                            }
                                                        }
                                                        IOUtils.cleanup(LOG, this.out);
                                                    }
                                                    try {
                                                        Thread.sleep(1000L);
                                                    }
                                                    catch (InterruptedException out3222222222) {
                                                        // empty catch block
                                                    }
                                                }
                                                this.responder.interrupt();
                                            }
                                            IOUtils.closeStream(this);
                                            this.cleanupBlock();
                                        }
                                        if (this.responder != null) {
                                            block100: {
                                                try {
                                                    this.responder.interrupt();
                                                    long joinTimeout = this.datanode.getDnConf().getXceiverStopTimeout();
                                                    joinTimeout = joinTimeout > 1L ? joinTimeout * 8L / 10L : joinTimeout;
                                                    this.responder.join(joinTimeout);
                                                    if (this.responder.isAlive()) {
                                                        String msg = "Join on responder thread " + this.responder + " timed out";
                                                        LOG.warn(msg + "\n" + StringUtils.getStackTrace(this.responder));
                                                        throw new IOException(msg);
                                                    }
                                                }
                                                catch (InterruptedException e) {
                                                    this.responder.interrupt();
                                                    if (this.datanode.isRestarting()) break block100;
                                                    throw new IOException("Interrupted receiveBlock");
                                                }
                                            }
                                            this.responder = null;
                                        }
                                        throw throwable;
                                    }
                                    Thread.interrupted();
                                    if (responderClosed) break block108;
                                    if (this.responder == null) break block109;
                                    if (!this.datanode.isRestarting() || !this.isClient || this.isTransfer) break block110;
                                    File blockFile = ((ReplicaInPipeline)this.replicaInfo).getBlockFile();
                                    File restartMeta = new File(blockFile.getParent() + File.pathSeparator + "." + blockFile.getName() + ".restart");
                                    if (restartMeta.exists() && !restartMeta.delete()) {
                                        LOG.warn("Failed to delete restart meta file: " + restartMeta.getPath());
                                    }
                                    try (OutputStreamWriter out4222222222 = new OutputStreamWriter((OutputStream)new FileOutputStream(restartMeta), "UTF-8");){
                                        out4222222222.write(Long.toString(Time.now() + this.restartBudget));
                                        ((Writer)out4222222222).flush();
                                    }
                                    catch (IOException out4222222222) {
                                        IOUtils.cleanup(LOG, this.out);
                                        break block95;
                                        catch (Throwable throwable) {
                                            IOUtils.cleanup(LOG, this.out);
                                            throw throwable;
                                        }
                                    }
                                    IOUtils.cleanup(LOG, this.out);
                                }
                                try {
                                    Thread.sleep(1000L);
                                }
                                catch (InterruptedException out4222222222) {
                                    // empty catch block
                                }
                            }
                            this.responder.interrupt();
                        }
                        IOUtils.closeStream(this);
                        this.cleanupBlock();
                    }
                    if (this.responder == null) break block96;
                    try {
                        this.responder.interrupt();
                        long joinTimeout = this.datanode.getDnConf().getXceiverStopTimeout();
                        joinTimeout = joinTimeout > 1L ? joinTimeout * 8L / 10L : joinTimeout;
                        this.responder.join(joinTimeout);
                        if (this.responder.isAlive()) {
                            String msg = "Join on responder thread " + this.responder + " timed out";
                            LOG.warn(msg + "\n" + StringUtils.getStackTrace(this.responder));
                            throw new IOException(msg);
                        }
                    }
                    catch (InterruptedException e) {
                        this.responder.interrupt();
                        if (this.datanode.isRestarting()) break block97;
                        throw new IOException("Interrupted receiveBlock");
                    }
                }
                this.responder = null;
            }
        }
    }

    private void cleanupBlock() throws IOException {
        if (this.isDatanode) {
            this.datanode.data.unfinalizeBlock(this.block);
        }
    }

    private void adjustCrcFilePosition() throws IOException {
        if (this.out != null) {
            this.out.flush();
        }
        if (this.checksumOut != null) {
            this.checksumOut.flush();
        }
        this.datanode.data.adjustCrcChannelPosition(this.block, this.streams, this.checksumSize);
    }

    private static long checksum2long(byte[] checksum) {
        long crc = 0L;
        for (int i = 0; i < checksum.length; ++i) {
            crc |= (0xFFL & (long)checksum[i]) << (checksum.length - i - 1) * 8;
        }
        return crc;
    }

    private Checksum computePartialChunkCrc(long blkoff, long ckoff) throws IOException {
        int sizePartialChunk = (int)(blkoff % (long)this.bytesPerChecksum);
        blkoff -= (long)sizePartialChunk;
        if (LOG.isDebugEnabled()) {
            LOG.debug("computePartialChunkCrc for " + this.block + ": sizePartialChunk=" + sizePartialChunk + ", block offset=" + blkoff + ", metafile offset=" + ckoff);
        }
        byte[] buf = new byte[sizePartialChunk];
        byte[] crcbuf = new byte[this.checksumSize];
        try (ReplicaInputStreams instr = this.datanode.data.getTmpInputStreams(this.block, blkoff, ckoff);){
            IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk);
            IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length);
        }
        DataChecksum partialCrc = DataChecksum.newDataChecksum(this.diskChecksum.getChecksumType(), this.diskChecksum.getBytesPerChecksum());
        partialCrc.update(buf, 0, sizePartialChunk);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Read in partial CRC chunk from disk for " + this.block);
        }
        if (partialCrc.getValue() != BlockReceiver.checksum2long(crcbuf)) {
            String msg = "Partial CRC " + partialCrc.getValue() + " does not match value computed the " + " last time file was closed " + BlockReceiver.checksum2long(crcbuf);
            throw new IOException(msg);
        }
        return partialCrc;
    }

    private ReplicaHandler claimReplicaHandler() {
        ReplicaHandler handler = this.replicaHandler;
        this.replicaHandler = null;
        return handler;
    }

    private static class Packet {
        final long seqno;
        final boolean lastPacketInBlock;
        final long offsetInBlock;
        final long ackEnqueueNanoTime;
        final DataTransferProtos.Status ackStatus;

        Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock, long ackEnqueueNanoTime, DataTransferProtos.Status ackStatus) {
            this.seqno = seqno;
            this.lastPacketInBlock = lastPacketInBlock;
            this.offsetInBlock = offsetInBlock;
            this.ackEnqueueNanoTime = ackEnqueueNanoTime;
            this.ackStatus = ackStatus;
        }

        public String toString() {
            return this.getClass().getSimpleName() + "(seqno=" + this.seqno + ", lastPacketInBlock=" + this.lastPacketInBlock + ", offsetInBlock=" + this.offsetInBlock + ", ackEnqueueNanoTime=" + this.ackEnqueueNanoTime + ", ackStatus=" + this.ackStatus + ")";
        }
    }

    class PacketResponder
    implements Runnable,
    Closeable {
        private final LinkedList<Packet> ackQueue = new LinkedList();
        private final Thread receiverThread = Thread.currentThread();
        private volatile boolean running = true;
        private final DataInputStream downstreamIn;
        private final DataOutputStream upstreamOut;
        private final PacketResponderType type;
        private final String myString;
        private boolean sending = false;

        public String toString() {
            return this.myString;
        }

        PacketResponder(DataOutputStream upstreamOut, DataInputStream downstreamIn, DatanodeInfo[] downstreams) {
            this.downstreamIn = downstreamIn;
            this.upstreamOut = upstreamOut;
            this.type = downstreams == null ? PacketResponderType.NON_PIPELINE : (downstreams.length == 0 ? PacketResponderType.LAST_IN_PIPELINE : PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE);
            StringBuilder b = new StringBuilder(this.getClass().getSimpleName()).append(": ").append(BlockReceiver.this.block).append(", type=").append((Object)this.type);
            if (this.type != PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
                b.append(", downstreams=").append(downstreams.length).append(":").append(Arrays.asList(downstreams));
            }
            this.myString = b.toString();
        }

        private boolean isRunning() {
            return this.running && (((BlockReceiver)BlockReceiver.this).datanode.shouldRun || BlockReceiver.this.datanode.isRestarting());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void enqueue(long seqno, boolean lastPacketInBlock, long offsetInBlock, DataTransferProtos.Status ackStatus) {
            Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock, System.nanoTime(), ackStatus);
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.myString + ": enqueue " + p);
            }
            LinkedList<Packet> linkedList = this.ackQueue;
            synchronized (linkedList) {
                if (this.running) {
                    this.ackQueue.addLast(p);
                    this.ackQueue.notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void sendOOBResponse(DataTransferProtos.Status ackStatus) throws IOException, InterruptedException {
            if (!this.running) {
                LOG.info("Cannot send OOB response " + ackStatus + ". Responder not running.");
                return;
            }
            PacketResponder packetResponder = this;
            synchronized (packetResponder) {
                if (this.sending) {
                    this.wait(PipelineAck.getOOBTimeout(ackStatus));
                    if (this.sending) {
                        throw new IOException("Could not send OOB reponse in time: " + ackStatus);
                    }
                }
                this.sending = true;
            }
            LOG.info("Sending an out of band ack of type " + ackStatus);
            try {
                this.sendAckUpstreamUnprotected(null, -2L, 0L, 0L, PipelineAck.combineHeader(BlockReceiver.this.datanode.getECN(), ackStatus));
            }
            finally {
                packetResponder = this;
                synchronized (packetResponder) {
                    this.sending = false;
                    this.notify();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        Packet waitForAckHead(long seqno) throws InterruptedException {
            LinkedList<Packet> linkedList = this.ackQueue;
            synchronized (linkedList) {
                while (this.isRunning() && this.ackQueue.size() == 0) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(this.myString + ": seqno=" + seqno + " waiting for local datanode to finish write.");
                    }
                    this.ackQueue.wait();
                }
                return this.isRunning() ? this.ackQueue.getFirst() : null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            Object object = this.ackQueue;
            synchronized (object) {
                while (this.isRunning() && this.ackQueue.size() != 0) {
                    try {
                        this.ackQueue.wait();
                    }
                    catch (InterruptedException e) {
                        this.running = false;
                        Thread.currentThread().interrupt();
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this.myString + ": closing");
                }
                this.running = false;
                this.ackQueue.notifyAll();
            }
            object = this;
            synchronized (object) {
                this.running = false;
                this.notifyAll();
            }
        }

        @Override
        public void run() {
            long startTime;
            boolean lastPacketInBlock = false;
            long l = startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0L;
            while (this.isRunning() && !lastPacketInBlock) {
                long totalAckTimeNanos = 0L;
                boolean isInterrupted = false;
                try {
                    Packet pkt = null;
                    long expected = -2L;
                    PipelineAck ack = new PipelineAck();
                    long seqno = -2L;
                    long ackRecvNanoTime = 0L;
                    try {
                        if (this.type != PacketResponderType.LAST_IN_PIPELINE && !BlockReceiver.this.mirrorError) {
                            DataTransferProtos.Status oobStatus;
                            ack.readFields(this.downstreamIn);
                            ackRecvNanoTime = System.nanoTime();
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(this.myString + " got " + ack);
                            }
                            if ((oobStatus = ack.getOOBStatus()) != null) {
                                LOG.info("Relaying an out of band ack of type " + oobStatus);
                                this.sendAckUpstream(ack, -2L, 0L, 0L, PipelineAck.combineHeader(BlockReceiver.this.datanode.getECN(), DataTransferProtos.Status.SUCCESS));
                                continue;
                            }
                            seqno = ack.getSeqno();
                        }
                        if (seqno != -2L || this.type == PacketResponderType.LAST_IN_PIPELINE) {
                            pkt = this.waitForAckHead(seqno);
                            if (!this.isRunning()) break;
                            expected = pkt.seqno;
                            if (this.type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE && seqno != expected) {
                                throw new IOException(this.myString + "seqno: expected=" + expected + ", received=" + seqno);
                            }
                            if (this.type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
                                totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
                                long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos();
                                if (ackTimeNanos < 0L) {
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns.");
                                    }
                                } else {
                                    ((BlockReceiver)BlockReceiver.this).datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
                                }
                            }
                            lastPacketInBlock = pkt.lastPacketInBlock;
                        }
                    }
                    catch (InterruptedException ine) {
                        isInterrupted = true;
                    }
                    catch (IOException ioe) {
                        if (Thread.interrupted()) {
                            isInterrupted = true;
                        }
                        if (ioe instanceof EOFException && !BlockReceiver.this.packetSentInTime()) {
                            LOG.warn("The downstream error might be due to congestion in upstream including this node. Propagating the error: ", ioe);
                            throw ioe;
                        }
                        BlockReceiver.this.mirrorError = true;
                        LOG.info(this.myString, ioe);
                    }
                    if (Thread.interrupted() || isInterrupted) {
                        LOG.info(this.myString + ": Thread is interrupted.");
                        this.running = false;
                        continue;
                    }
                    if (lastPacketInBlock) {
                        this.finalizeBlock(startTime);
                    }
                    DataTransferProtos.Status myStatus = pkt != null ? pkt.ackStatus : DataTransferProtos.Status.SUCCESS;
                    this.sendAckUpstream(ack, expected, totalAckTimeNanos, pkt != null ? pkt.offsetInBlock : 0L, PipelineAck.combineHeader(BlockReceiver.this.datanode.getECN(), myStatus));
                    if (pkt == null) continue;
                    this.removeAckHead();
                }
                catch (IOException e) {
                    LOG.warn("IOException in BlockReceiver.run(): ", e);
                    if (!this.running) continue;
                    BlockReceiver.this.datanode.checkDiskErrorAsync();
                    LOG.info(this.myString, e);
                    this.running = false;
                    if (Thread.interrupted()) continue;
                    this.receiverThread.interrupt();
                }
                catch (Throwable e) {
                    if (!this.running) continue;
                    LOG.info(this.myString, e);
                    this.running = false;
                    this.receiverThread.interrupt();
                }
            }
            LOG.info(this.myString + " terminating");
        }

        private void finalizeBlock(long startTime) throws IOException {
            long endTime = 0L;
            try (ReplicaHandler handler = BlockReceiver.this.claimReplicaHandler();){
                BlockReceiver.this.close();
                endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0L;
                BlockReceiver.this.block.setNumBytes(BlockReceiver.this.replicaInfo.getNumBytes());
                ((BlockReceiver)BlockReceiver.this).datanode.data.finalizeBlock(BlockReceiver.this.block);
            }
            if (BlockReceiver.this.pinning) {
                ((BlockReceiver)BlockReceiver.this).datanode.data.setPinning(BlockReceiver.this.block);
            }
            BlockReceiver.this.datanode.closeBlock(BlockReceiver.this.block, "", BlockReceiver.this.replicaInfo.getStorageUuid());
            if (ClientTraceLog.isInfoEnabled() && BlockReceiver.this.isClient) {
                long offset = 0L;
                DatanodeRegistration dnR = BlockReceiver.this.datanode.getDNRegistrationForBP(BlockReceiver.this.block.getBlockPoolId());
                ClientTraceLog.info(String.format("src: %s, dest: %s, bytes: %s, op: %s, cliID: %s, offset: %s, srvID: %s, blockid: %s, duration: %s", BlockReceiver.this.inAddr, BlockReceiver.this.myAddr, BlockReceiver.this.block.getNumBytes(), "HDFS_WRITE", BlockReceiver.this.clientname, offset, dnR.getDatanodeUuid(), BlockReceiver.this.block, endTime - startTime));
            } else {
                LOG.info("Received " + BlockReceiver.this.block + " size " + BlockReceiver.this.block.getNumBytes() + " from " + BlockReceiver.this.inAddr);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void sendAckUpstream(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, int myHeader) throws IOException {
            try {
                PacketResponder packetResponder = this;
                synchronized (packetResponder) {
                    while (this.sending) {
                        this.wait();
                    }
                    this.sending = true;
                }
                try {
                    if (!this.running) {
                        return;
                    }
                    this.sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos, offsetInBlock, myHeader);
                }
                finally {
                    packetResponder = this;
                    synchronized (packetResponder) {
                        this.sending = false;
                        this.notify();
                    }
                }
            }
            catch (InterruptedException ie) {
                this.running = false;
            }
        }

        private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, int myHeader) throws IOException {
            int[] replies;
            if (ack == null) {
                replies = new int[]{myHeader};
            } else if (BlockReceiver.this.mirrorError) {
                int h2 = PipelineAck.combineHeader(BlockReceiver.this.datanode.getECN(), DataTransferProtos.Status.SUCCESS);
                int h1 = PipelineAck.combineHeader(BlockReceiver.this.datanode.getECN(), DataTransferProtos.Status.ERROR);
                replies = new int[]{h2, h1};
            } else {
                int ackLen = this.type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack.getNumOfReplies();
                replies = new int[ackLen + 1];
                replies[0] = myHeader;
                for (int i = 0; i < ackLen; ++i) {
                    replies[i + 1] = ack.getHeaderFlag(i);
                }
                if (ackLen > 0 && PipelineAck.getStatusFromHeader(replies[1]) == DataTransferProtos.Status.ERROR_CHECKSUM) {
                    throw new IOException("Shutting down writer and responder since the down streams reported the data sent by this thread is corrupt");
                }
            }
            PipelineAck replyAck = new PipelineAck(seqno, replies, totalAckTimeNanos);
            if (replyAck.isSuccess() && offsetInBlock > BlockReceiver.this.replicaInfo.getBytesAcked()) {
                BlockReceiver.this.replicaInfo.setBytesAcked(offsetInBlock);
            }
            long begin = Time.monotonicNow();
            replyAck.write(this.upstreamOut);
            this.upstreamOut.flush();
            long duration = Time.monotonicNow() - begin;
            if (duration > BlockReceiver.this.datanodeSlowLogThresholdMs) {
                LOG.warn("Slow PacketResponder send ack to upstream took " + duration + "ms (threshold=" + BlockReceiver.this.datanodeSlowLogThresholdMs + "ms), " + this.myString + ", replyAck=" + replyAck);
            } else if (LOG.isDebugEnabled()) {
                LOG.debug(this.myString + ", replyAck=" + replyAck);
            }
            DataTransferProtos.Status myStatus = PipelineAck.getStatusFromHeader(myHeader);
            if (myStatus == DataTransferProtos.Status.ERROR_CHECKSUM) {
                throw new IOException("Shutting down writer and responder due to a checksum error in received data. The error response has been sent upstream.");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void removeAckHead() {
            LinkedList<Packet> linkedList = this.ackQueue;
            synchronized (linkedList) {
                this.ackQueue.removeFirst();
                this.ackQueue.notifyAll();
            }
        }
    }

    private static enum PacketResponderType {
        NON_PIPELINE,
        LAST_IN_PIPELINE,
        HAS_DOWNSTREAM_IN_PIPELINE;

    }
}

