Hadoop源码注释 - 块读取
DFSClient.RemoteBlockReader.newBlockReader()
public static BlockReader newBlockReader( Socket sock, String file, long blockId, Token<BlockTokenIdentifier> accessToken, long genStamp, long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName) throws IOException { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))); // write the header. 使用OutputStream发起读数据块的请求头报文 out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);// ① out.write( DataTransferProtocol.OP_READ_BLOCK);// ② out.writeLong( blockId );// ③ out.writeLong( genStamp );// ④ out.writeLong( startOffset );// ⑤ out.writeLong( len );// ⑥ Text.writeString(out, clientName);// ⑦ accessToken.write(out);// ⑧ out.flush(); // Get bytes in block, set streams 使用InputStream接收DataNode传回来的数据 DataInputStream in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(sock), bufferSize)); short status = in.readShort();// [1] 读取状态信息 if (status != DataTransferProtocol.OP_STATUS_SUCCESS) { } // throw Exception... DataChecksum checksum = DataChecksum.newDataChecksum( in );// [2-1] checksumHeader(校验类型和校验块大小) long firstChunkOffset = in.readLong();// [2-2] Read the first chunk offset第一个校验块的起始位置 if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file); } return new RemoteBlockReader(file, blockId, in, checksum, verifyChecksum, startOffset, firstChunkOffset, sock); }
?
?
DataXceiver.readBlock()
/** Read a block from the disk. 读取本地磁盘上的数据块, 用于发送给客户端 * @param in The stream to read from 输入流,客户端发送的请求头数据, 用于读取解析数据构造BlockSender */ private void readBlock(DataInputStream in) throws IOException { // 1. Read in the header 读取客户端发送的请求头信息 // ① ② 在DataXceiver的run方法已经读取过了,然后分发到不同的子程序处理 long blockId = in.readLong(); // ③ 8bytes的blockId. Block block = new Block( blockId, 0 , in.readLong()); // ④ 8bytes的数据块版本号 long startOffset = in.readLong(); // ⑤ 8bytes的startOffset开始读取的Block的偏移量 long length = in.readLong(); // ⑥ 8bytes的读取长度 String clientName = Text.readString(in); // ⑦ 发送请求的客户端名称 Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>(); accessToken.readFields(in);// ⑧ 数据块的访问权限, 安全相关, 不讨论 OutputStream baseStream = NetUtils.getOutputStream(s, datanode.socketWriteTimeout); DataOutputStream out = new DataOutputStream(new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE)); // blockToken: If InvalidToken, out.writeShort(OP_STATUS_ERROR_ACCESS_TOKEN) // 2. send the block BlockSender blockSender = null; try { try { blockSender = new BlockSender(block, startOffset, length, true, true, false, datanode, clientTraceFmt); } catch(IOException e) { out.writeShort(DataTransferProtocol.OP_STATUS_ERROR); // [1] 创建BlockSender对象失败,发送操作失败的状态标示返回给客户端 throw e; // --> catch(IOException e) 不会执行下面的语句哦 } out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // [1] send op status 创建BlockSender成功,发送成功状态标示返回给客户端 long read = blockSender.sendBlock(out, baseStream, null); // send data [2] 会发送checksumHeader + offset + PACKET if (blockSender.isBlockReadFully()) { // 客户端读取完整个数据块, 由客户端验证文件的校验和. 而不是在发送数据时验证 // 3. See if client verification succeeded. This is an optional response from client. if (in.readShort() == DataTransferProtocol.OP_STATUS_CHECKSUM_OK && datanode.blockScanner != null) { datanode.blockScanner.verifiedByClient(block); } } } catch ( SocketException ignored ) { // Its ok for remote side to close the connection anytime. } catch ( IOException ioe ) { throw ioe; } finally { IOUtils.closeStream(out); IOUtils.closeStream(blockSender); } }
?
?
BlockSender.sendBlock()
?
/** sendBlock() is used to read block and its metadata and stream the data to either a client or to another datanode. * 读取块和元数据, 通过输出流传送给客户端(读取请求)或者另一个datanode(复制请求) * @param out stream to which the block is written to 数据块要写入的输出流, 输出流即发送出去 * @param baseStream optional. if non-null, out is assumed to be a wrapper over this stream. * This enables optimizations for sending the data, e.g. SocketOutputStream#transferToFully(FileChannel, long, int). 使用FileChannel优化发送数据 * @param throttler for sending data. 发送数据节流器 * @return total bytes reads, including crc. 总共读取的字节,包括校验文件 */ long sendBlock(DataOutputStream out, OutputStream baseStream, BlockTransferThrottler throttler) throws IOException { this.throttler = throttler; initialOffset = offset; long totalRead = 0; OutputStream streamForSendChunks = out; lastCacheDropOffset = initialOffset; if (isLongRead() && blockInFd != null) { // Advise that this file descriptor will be accessed sequentially. NativeIO.posixFadviseIfPossible(blockInFd, 0, 0, NativeIO.POSIX_FADV_SEQUENTIAL); } manageOsCache(); // Trigger readahead of beginning of file if configured. try { // 1. 将checksum header通过DataOutputStream发送到客户端. 客户端通过DataInputStream接收 try { checksum.writeHeader(out); // ① ② 写入checksum header . DataChecksum在构造函数中构造,包含type, bytePerChecksum if ( chunkOffsetOK ) {// ③ 需要发送块的开始位置, 还需要写入offset out.writeLong( offset ); } out.flush(); } catch (IOException e) { throw ioeToSocketException(e); } //socket error // 2. 计算每个packet数据(checksum和数据)的大小 分配packet缓冲区大小(packet header + data + checksum) int maxChunksPerPacket; // 一次Packet可以发送最多多少个chunks int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; // [1]packet header+length if (transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream && blockIn instanceof FileInputStream) { // 检查是或允许transferTo, 使用FileChannel来传输数据, 而不是先将数据读取到缓冲区 FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); blockInPosition = fileChannel.position(); // blockInPosition also indicates sendChunks() uses transferTo. streamForSendChunks = baseStream; // assure a mininum buffer size. maxChunksPerPacket = (Math.max(BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO) + bytesPerChecksum - 1)/bytesPerChecksum; } else { maxChunksPerPacket = Math.max(1, (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum); } // packet buffer has to be able to do a normal transfer in the case of recomputing checksum pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket; // [2](data length+checksum size)*chunks ByteBuffer pktBuf = ByteBuffer.allocate(pktSize); // 3. 将所有packet写到out. Packet由一系列的Chunk组成. while (endOffset > offset) { manageOsCache(); long len = sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks); // ④ offset += len; totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*checksumSize); seqno++; } // 4. 将一整数(int)0写到out, 标记块的结束 mark the end of block out.writeInt(0); // ⑤ out.flush(); } catch (RuntimeException e) { throw new IOException("unexpected runtime exception", e); } finally { close();} blockReadFully = (initialOffset == 0 && offset >= blockLength); return totalRead; }
?
BlockSender.sendChunks()
/** Sends upto maxChunks chunks of data. 发送一个数据包 */ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) throws IOException { // 1. 计算数据包的长度 // Sends multiple chunks in one packet with a single write(). int len = (int) Math.min(endOffset - offset, (((long) bytesPerChecksum) * ((long) maxChunks))); // truncate len so that any partial chunks will be sent as a final packet. this is not necessary for correctness, // but partial chunks are ones that may be recomputed and sent via buffer copy, so try to minimize those bytes if (len > bytesPerChecksum && len % bytesPerChecksum != 0) { len -= len % bytesPerChecksum; } if (len == 0) return 0; int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum; // 计算这个数据包中应该包含有多少个校验数据块 int packetLen = len + numChunks*checksumSize + 4; // len为数据长度, 中间为校验和的长度, 最后的4为下面⑤即数据长度 pkt.clear(); // 参数pkt是在sendBlock()经过计算pktSize的缓冲区 因此使用前要先清空buffer // packetLen是从数据长度字段开始的长度即包括⑤之后的长度. ⑤为4bytes, 然后分别是真正的数据和校验和的长度. // 实际上pkt的大小确定了下面要往该缓冲区pkt放入的数据的多少. 比如packet header分别从①-⑤, 对应的大小为4+8+8+1+4=21+4. // 2. 数据包头部信息写入缓冲区 write packet header pkt.putInt(packetLen);// ① 数据包长度 pkt.putLong(offset);// ② 数据包中的数据在Block中的开始位置 pkt.putLong(seqno);// ③ 数据包的编号 pkt.put((byte)((offset + len >= endOffset) ? 1 : 0)); // ④ 是否有数据包标志 ②③④其实对应sendBlock()中while循环的处理 pkt.putInt(len);// ⑤ 数据包中数据的长度 // 3. 校验和写入缓冲区 注: 此时还没发送到客户端. 只有通过OutputStream将buf内容写到OutputStream, 才算发送到客户端 int checksumOff = pkt.position(); // 当前缓冲区的位置及校验和的开始位置. 由此可见首先发送校验和 int checksumLen = numChunks * checksumSize; // 校验和的长度=chunks*4 byte[] buf = pkt.array(); // 缓冲区的大小为pktSize的大小. 用来暂存接下来的校验和和数据. 注: 将数据写到缓冲区中! if (checksumSize > 0 && checksumIn != null) {// ⑥ 从checksumIn输入流将校验和数据读取到缓冲区buf中 checksumIn.readFully(buf, checksumOff, checksumLen); // 将输入流的数据读取到buf开始位置为checksumOff, 长为checksumLen的区域 } // 通过pkt.position计算checksumOff, pkt前面存放了packet header. checksum要接着packet header int dataOff = checksumOff + checksumLen; // 校验和的开始位置+长度=数据的偏移量/开始位置 if (blockInPosition < 0) {// 如果>=0, 则使用零拷贝. 默认为-1. 如果允许零拷贝, 在sendBlock时会设置该值>=0 // 4. 数据写入缓冲区 normal transfer 从blockIn输入流读取块数据到缓冲区中偏移量为dataOff, 长度为len的区域 IOUtils.readFully(blockIn, buf, dataOff, len); // buf紧接着checksum的是data. data的开始位置为checksum的结束位置+1. +1其实由position内部实现 // 5. 对发送的数据验证校验和 (客户端读取数据不会执行此校验) if (verifyChecksum) { int dOff = dataOff; // 要计算的校验块(真正的数据)的起始位置 int cOff = checksumOff; // 校验块对应的校验和的起始位置. 校验块的起始位置每+512bytes, 校验和的起始位置就+4bytes int dLeft = len; // len最开始为多个chunk>512, min会取bytesPerChecksum=512. 在计算每个校验块后, dLeft递减512, 最后 dLeft可能<512 for (int i=0; i<numChunks; i++) { // 对每个校验块chunk计算校验和 checksum.reset(); int dLen = Math.min(dLeft, bytesPerChecksum); // 如果len不足512bytes, 则只对这部分数据计算校验和, 因为最后一部分可能不足512bytes checksum.update(buf, dOff, dLen); // 缓冲区buf存放的就是已经从blockIn读取出来的真正数据了. 可以直接对数据计算校验和 if (!checksum.compare(buf, cOff)) { // 比较校验和cOff开始和经过上面计算的校验和 throw new ChecksumException("Checksum failed at " + (offset + len - dLeft), len); } dLeft -= dLen; // 剩余参与计算的校验块的大小递减 dOff += dLen; // 起始位置递增, 即参与计算的下一个校验块的起始位置 cOff += checksumSize; // 校验和的起始位置经过一个校验块的计算也递增4bytes, 为的是和下一个校验块经过计算的校验和进行比较 } } // only recompute checksum if we can't trust the meta data due to concurrent writes 存在竞争条件重新计算校验和 if (memoizedBlock.hasBlockChanged(len)) {ChecksumUtil.updateChunkChecksum(buf, checksumOff, dataOff, len, checksum);} // 6. 将缓冲区的数据全部写到输出流OutputStream中, 完成向接收端的数据发送 out.write(buf, 0, dataOff + len); // ⑦ 缓冲区从0开始, 一直到真正数据的结束位置. 即发送整个PACKET } else { // 如果允许零拷贝, 在调用该方法之前的sendBlock就设置了blockInPosition为正数. 就会执行此零拷贝的优化方式. try {// use transferTo(). Checks on out and blockIn are already done. 通过Socket发送, 即使用FileChannel来优化发送数据, 而不是通过流的方式 // 4. 采用零拷贝主要针对的是要发送的数据. PACKET的header和checksum并没有使用零拷贝. 因为那部分数据比较小. SocketOutputStream sockOut = (SocketOutputStream) out;// 在sendBlock中已经确保了对象类型的正确性, 才允许进入零拷贝 FileChannel fileChannel = ((FileInputStream) blockIn).getChannel();// 所以这里可以放心将out和blockIn转为零拷贝需要的类型 if (memoizedBlock.hasBlockChanged(len)) { fileChannel.position(blockInPosition); IOUtils.readFileChannelFully(fileChannel, buf, dataOff, len); ChecksumUtil.updateChunkChecksum(buf, checksumOff, dataOff, len, checksum); sockOut.write(buf, 0, dataOff + len); // ⑦ 数据块 } else { // 5. 首先将buf缓冲区的数据先发送到接收端对应的SocketOutputStream. 这部分数据是PACKET的header和checksum sockOut.write(buf, 0, dataOff);// ⑦ 使用Socket输出流发送缓存数据包,直接写到Socket输出流 first write the packet // 6. 接着使用零拷贝发送真正的数据. 普通的传输方式是将blockIn先写到buf中, 这之后还要将buf中的数据发送到接收端的OutputStream. // 而零拷贝直接将blockIn传输到接收端的socketOut, 免去了中间多余的两部分内存拷贝操作和上下文切换带来的系统开销. sockOut.transferToFully(fileChannel, blockInPosition, len); // no need to flush. since we know out is not a buffered stream. 零拷贝! } blockInPosition += len; } catch (IOException e) { // exception while writing to the client (well, with transferTo(), it could also be while reading from the local file). throw ioeToSocketException(e); } } if (throttler != null) { throttler.throttle(packetLen); } // rebalancing so throttle 调整发送速度 return len; }
?
全文请期待 http://zqhxuyuan.github.com 的相关博文