DataNode本地数据存储和管理
由于DataNode数可以成千上万,NameNode只有一个,为了减轻NameNode的负担,NameNode上并不永久保存那个DataNode上有那些数据块的信息,而是通过DataNode启动时的上报,来更新NameNode上的映射表。
DataNode和NameNode建立连接以后,就会不断地和NameNode保持心跳。心跳的返回包含了NameNode对DataNode的一些命令,如删除数据库或者是把数据块复制到另一个DataNode。NameNode不会发起到DataNode的请求,在通信过程中,它们是严格的客户端/服务器架构。
在客户端做写操作的时候,DataNode需要相互配合,保证写操作的一致性。
DataNode的实现包括两部分,一部分是对本地数据块的管理,另一部分,就是和其他的实体打交道。
-----------------------DataNode本地数据存储结构Storage类------------------------------------------
所有的数据就存放在dfs/data里面:
storage里存的东西是一些出错信息;
in_use.lock是一个空文件,它的作用是如果需要对整个系统做排斥操作,应用应该获取它,以它为锁;
current存的是当前有效的数据块,detach存放的是分离文件(用于分离硬链接,即copy-on-write),tmp保存的是一些操作需要的临时数据块。current下包含了一系列的数据块文件和数据块元数据文件;
数据块文件显然保存了HDFS中的数据,数据块最大可以刡64M。每个数据块文件都会有对应的数据块元数据文件。里面存放的是数据块的校验信息。下面是数据块文件名和它的元数据文件名的例子:
blk_3148782637964391313 ---->数据文件
blk_3148782637964391313_242812.meta --->元数据文件(校验文件)
上面的例子中,3148782637964391313是数据块的ID号,242812是数据块的stamp,用于一致性检查。
在current目录下有下面几个文件:
dncp_block_verification.log.curr和dncp_block_verification.log.prev,它记录了一些DataNode对文件系定时统做一致性检查需要的信息。
VERSION,保存了一些文件系统的元信息。
DataNode存储的目录结构:
${dfs.data.dir}/current/VERSION
/blk_<id_1>
/blk_<id_1>_genStamp.meta
/blk_<id_2>
/blk_<id_2>_genStamp.meta
/...
/blk_<id_64>
/blk_<id_64>_genStamp.meta
/subdir0/
/subdir1/
/...
/subdir63/
/previous/
/detach/
/tmp/
/in_use.lock
/storage
类Storage保存了和存储相关的信息,它继承了StorageInfo。StorageInfo是一个简单的存储信息类,包含变量:
public int layoutVersion; // HDFS的固定数据结构的版本
public int namespaceID; // storage的namespaceId
public long cTime; //创建的时间
其中,HDFS的固定数据结构的版本是由一个叫layoutVersion负整数定义的,这个版本号与Hadoop分布的发行号是不相干的。无论何时版本号都是减一(比如:版本-18过了就是-19)。当版本号变小时HDFS就需要升级,因此如果HDFS保存的layout是个旧的版本,那么新的namenode(datanode)就无法操作。
Storage.StorageDirectory是存储文件夹类:包含了备份/恢复机制,Storage.StorageDirectory.analyzeStorage通过判断文件夹current,previous,previous.tmp,removed.tmp,finalized.Tmp,checkpoint.tmp的存在组合情况,来检测该文件夹下的一致性(由于升级/回滚/提交finalize是一个过程,所以可能出现中途中断的情况),返回相应的状态。
StorageDirectory.doRecover根据返回的状态在doRecover中进行相应的恢复操作:
COMPLETE_UPGRADE:mv previous.tmp -> previous
RECOVER_UPGRADE:mv previous.tmp -> current
COMPLETE_FINALIZE:rm finalized.tmp
COMPLETE_ROLLBACK:rm removed.tmp
RECOVER_ROLLBACK:mv removed.tmp -> current
COMPLETE_CHECKPOINT:mv lastcheckpoint.tmp -> previous.checkpoint
RECOVER_CHECKPOINT:mv lastcheckpoint.tmp -> current
current文件夹下的VERSION就是通过StorageDirectory.read/write进行生成读取的。
storageDirectory.lock/unlock用于控制多个并发程序对文件夹的访问操作。lock通过调用FileChannel.tryLock对文件in_use.lock进行锁定,要对这个文件夹进行操作时候要先获取这个锁,如果获取不到就不能访问。在获取文件夹的状态时要进行lock,因为获取后要在doRecover中根据状态对文件夹进行相应的恢复操作,在doRecover结束后进行unlock。即在对文件夹操作前要lock,操作结束后unlock。
Storage.DirIterator是一个用于访问存储文件夹类型为DirIterator.dirType的迭代器。其hasNext和next都与List操作一样,只是对要访问的类型做了限定,是DirIterator.dirType类型的元素,其他类型文件视为透明。
数据是存储在Storage.storageDirs这些文件夹下的。
DataStorage是Storage的子类,用于描述datanode上的存储文件夹(datanode下可以有多个文件夹用于存放数据,命名为subdir0...。因为HDFS规定了一个目录能存放Block的数目(dfs.datanode.numblocks,默认64)。如果数目多了会用创建新的子目录来存放新块。),并拥有update/rollback/finalize等存储文件夹操作,在recoverTransitionRead调用StorageDirectory.analyzeStorage分析文件夹的状态,出错时调用doRecover进行恢复。
-----------------------DataNode本地数据存储管理FSDataset类------------------------------------------
所有和数据块相关的操作,都在FSDataset相关的类中进行处理。
DataNode的存储结构由大到小为卷FSVolume、目录FSDir、文件(block和元数据等)。
DataNodeBlockInfo类存放的是Block在文件系统上的信息:FSVolume所属的卷,File所属的文件,detached是否分离(即是否不与其他文件建立硬链接)。
DataNodeBlockInfo.DetachFile是用于分离文件。在升级update后,previous目录和current目录会通过硬链接指到同一个文件,previous保存的是旧版本文件,当要修改current的文件时,需要将这个文件进行分离(copy-on-write),使得不会影响到previous中的文件,detachFile就是做这个的:在detach目录下创建复制一个相同的文件,然后将这个文件复制到current下,这样current下文件的硬链接就断掉了。
在DataNode中,不需要考虑块属于哪个文件,‘块文件’指的是数据块的文件,而非所属的文件。
由于HDFS规定了一个目录能存放Block的数目,所以一个Storage上存在多个目录。
对应的,FSDataset中用FSVolume来对应一个Storage(一个dfs中可以有多个Storage),FSDir对应一个目录,所有的FSVolume由FSVolumeSet管理,
FSDataset中通过一个FSVolumeSet对象,就可以管理它的所有存储空间。当然这些文件、文件夹都是指存放在current下。
FSDir是一个目录类,包含变量:
private synchronized List<Thread> tryUpdateBlock( Block oldblock, Block newblock) throws IOException { //check ongoing create threads final ActiveFile activefile = ongoingCreates.get(oldblock);//获取与此块相关的文件及访问这个块的线程 if (activefile != null && !activefile.threads.isEmpty()) {//如果近期有对此块进行操作,返回存活的操作线程 //remove dead threads for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) { final Thread t = i.next(); if (!t.isAlive()) { i.remove(); } } //return living threads //近期有对旧块进行操作,将存活的线程返回 if (!activefile.threads.isEmpty()) { return new ArrayList<Thread>(activefile.threads); } } //近期无对此块操作的线程,就更新块 //No ongoing create threads is alive. Update block. File blockFile = findBlockFile(oldblock.getBlockId());//获得旧块的文件 if (blockFile == null) { throw new IOException("Block " + oldblock + " does not exist."); } //获得旧块的元数据文件 File oldMetaFile = findMetaFile(blockFile); //获得旧块中的genStamp long oldgs = parseGenerationStamp(blockFile, oldMetaFile); //rename meta file to a tmp file //先将旧块元数据文件重命名 File tmpMetaFile = new File(oldMetaFile.getParent(), oldMetaFile.getName()+"_tmp" + newblock.getGenerationStamp()); if (!oldMetaFile.renameTo(tmpMetaFile)){ throw new IOException("Cannot rename block meta file to " + tmpMetaFile); } //旧块stamp比新块stamp大,不合法 if (oldgs > newblock.getGenerationStamp()) { throw new IOException("Cannot update block (id=" + newblock.getBlockId() + ") generation stamp from " + oldgs + " to " + newblock.getGenerationStamp()); } //update length //新块的大小大于旧块的大小 if (newblock.getNumBytes() > oldblock.getNumBytes()) { throw new IOException("Cannot update block file (=" + blockFile + ") length from " + oldblock.getNumBytes() + " to " + newblock.getNumBytes()); } //新块的大小小于旧块的大小 if (newblock.getNumBytes() < oldblock.getNumBytes()) { //截断旧块和旧块的元数据文件 truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes()); } //rename the tmp file to the new meta file (with new generation stamp) //将重命名后的旧元数据文件重命名为新块元数据文件 File newMetaFile = getMetaFile(blockFile, newblock); if (!tmpMetaFile.renameTo(newMetaFile)) { throw new IOException("Cannot rename tmp meta file to " + newMetaFile); } updateBlockMap(ongoingCreates, oldblock, newblock); updateBlockMap(volumeMap, oldblock, newblock); // verify that the contents of the stored block // matches the block file on disk. validateBlockMetadata(newblock); return null; } //truncateBlock对旧块blockFile和对应的元数据文件metaFile进行截断,截断后旧块长度为newlen(newlen<oldlen)。 static private void truncateBlock(File blockFile, File metaFile, long oldlen, long newlen) throws IOException { if (newlen == oldlen) { return; } if (newlen > oldlen) { throw new IOException("Cannout truncate block to from oldlen (=" + oldlen + ") to newlen (=" + newlen + ")"); } //由于只是对就块进行截断,所有新块的最后一个校验和字段可能在旧块中不一样, //所有setLength进行截断后,要读取最后一个校验和字段 DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); int checksumsize = dcs.getChecksumSize(); int bpc = dcs.getBytesPerChecksum(); long n = (newlen - 1)/bpc + 1;//校验和的段数 long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;//新的校验和文件的长度 long lastchunkoffset = (n - 1)*bpc;//最后一个校验和字段的偏移位置 int lastchunksize = (int)(newlen - lastchunkoffset); //最后一个校验和的开始位置 byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");//对旧块进行读取 try { //truncate blockFile blockRAF.setLength(newlen);//截断旧块 //read last chunk blockRAF.seek(lastchunkoffset);//读取最后一个校验和字段 blockRAF.readFully(b, 0, lastchunksize); } finally { blockRAF.close(); } //compute checksum dcs.update(b, 0, lastchunksize);//计算新块的最后一个校验和字段的校验和 dcs.writeValue(b, 0, false); //update metaFile RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); try { metaRAF.setLength(newmetalen);//截断旧块的元数据文件 metaRAF.seek(newmetalen - checksumsize);//移到最后一个校验和字段 metaRAF.write(b, 0, checksumsize);//将最后一个校验和字段校验值写入 } finally { metaRAF.close(); } }
FSDataset.finalizeBlock用于将tmp目录下的块和元数据文件移动到current下,同时将此块从ongoingcreate近期操作列表中移除,并修改volumeMap中块的位置值。提交(或叫:结束finalize)通过writeToBlock打开的block,返意味着写过程没有出错,可以正式把Block从tmp文件夹放到current文件夹。
FSDataset.unfinalizeBlock的工作相反:将块b从ongoingcreate、volumeMap中清除走。被FSDataset.cleanupBlock调用。
FSDataset.invalidate(Block invalidBlks[])用于将invalidBlks块和相应的元数据文件删除掉,从volumeMap中删除掉这个块记录,同时将更新块所在的卷的空间使用情况。
FSDataset.findBlockFile(long blockId) 用于根据blockId查找对应的快文件:先从ongoingcreate近期访问列表中找,找不到再找volumeMap哈希表。