首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

第七章:小朱札记hadoop之源码分析-hdfs分析 Datanode 心跳分析

2013-06-26 
第七章:小朱笔记hadoop之源码分析-hdfs分析 Datanode 心跳分析第七章:小朱笔记hadoop之源码分析-hdfs分析

第七章:小朱笔记hadoop之源码分析-hdfs分析 Datanode 心跳分析

第七章:小朱笔记hadoop之源码分析-hdfs分析

第五节:Datanode 分析

5.2 Datanode 心跳分析

(1)offerService分析

/** * Main loop for the DataNode. Runs until shutdown, * forever calling remote NameNode functions. * * 1.检查心跳间隔是否超时,如是向namenode发送心跳报告,内容是dfs的容量、剩余的空间和DataXceiverServer的数量等,调用processCommand方法处理namenode返回的命令 * 2.通知namenode已经接收的块 * 3.检查块报告间隔是否超时,如是向namenode发送块报告,调用processCommand方法处理namenode返回的命令 * 4.如果没到下个发送心跳的时候,休眠 * * * DNA_UNKNOWN = 0:未知操作 * DNA_TRANSFER = 1:传输块到另一个datanode,创建DataTransfer来传输每个块,请求的类型是OP_WRITE_BLOCK,使用BlockSender来发送块和元数据文件,不对块进行校验 * DNA_INVALIDATE = 2:不合法的块,将所有块删除 * DNA_SHUTDOWN = 3:停止datanode,停止infoServer、DataXceiverServer、DataBlockScanner和处理线程,将存储目录解锁,DataBlockScanner结束可能需要等待1小时 * DNA_REGISTER = 4:重新注册 * DNA_FINALIZE = 5:完成升级,调用DataStorage的finalizeUpgrade方法完成升级 * DNA_RECOVERBLOCK = 6:请求块恢复,创建线程来恢复块,每个线程服务一个块,对于每个块,调用recoverBlock来恢复块信息 * * 会利用保存在receivedBlockList和delHints两个列表中的信息。 * receivedBlockList表明在这个DataNode成功创建的新的数据块 * delHints,是可以删除该数据块的节点 */ public void offerService() throws Exception { LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" + " Initial delay: " + initialBlockReportDelay + "msec"); // // Now loop for a long time.... // while (shouldRun) { try { long startTime = now(); // // Every so often, send heartbeat or block-report // if (startTime - lastHeartbeat > heartBeatInterval) { //定期发送心跳 // // All heartbeat messages include following info: // -- Datanode name // -- data transfer port // -- Total capacity // -- Bytes remaining // lastHeartbeat = startTime; DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration, data.getCapacity(), data.getDfsUsed(), data.getRemaining(), xmitsInProgress.get(), getXceiverCount()); myMetrics.addHeartBeat(now() - startTime); //LOG.info("Just sent heartbeat, with name " + localName); //响应namenode返回的命令做处理 if (!processCommand(cmds)) continue; } // check if there are newly received blocks Block [] blockArray=null; String [] delHintArray=null; synchronized(receivedBlockList) { synchronized(delHints) { int numBlocks = receivedBlockList.size(); if (numBlocks > 0) { if(numBlocks!=delHints.size()) { LOG.warn("Panic: receiveBlockList and delHints are not of the same length" ); } // // Send newly-received blockids to namenode // blockArray = receivedBlockList.toArray(new Block[numBlocks]);// receivedBlockList表明在这个DataNode成功创建的新的数据块,而delHints,是可以删除该数据块的节点 delHintArray = delHints.toArray(new String[numBlocks]);//在datanode.notifyNamenodeReceivedBlock函数中发生变化 } } } if (blockArray != null) { if(delHintArray == null || delHintArray.length != blockArray.length ) { LOG.warn("Panic: block array & delHintArray are not the same" ); } namenode.blockReceived(dnRegistration, blockArray, delHintArray);;//Block状态变化报告通过NameNode.blockReceived来报告。 synchronized (receivedBlockList) { synchronized (delHints) { for(int i=0; i<blockArray.length; i++) { receivedBlockList.remove(blockArray[i]); delHints.remove(delHintArray[i]); } } } } // Send latest blockinfo report if timer has expired. if (startTime - lastBlockReport > blockReportInterval) {// 向namenode报告系统中Block状态的变化 if (data.isAsyncBlockReportReady()) { // Create block report long brCreateStartTime = now(); Block[] bReport = data.retrieveAsyncBlockReport(); // Send block report long brSendStartTime = now(); //向Namenode报告其上的块状态报告 DatanodeCommand cmd = namenode.blockReport(dnRegistration, BlockListAsLongs.convertToArrayLongs(bReport)); // Log the block report processing stats from Datanode perspective long brSendCost = now() - brSendStartTime; long brCreateCost = brSendStartTime - brCreateStartTime; myMetrics.addBlockReport(brSendCost); LOG.info("BlockReport of " + bReport.length + " blocks took " + brCreateCost + " msec to generate and " + brSendCost + " msecs for RPC and NN processing"); // // If we have sent the first block report, then wait a random // time before we start the periodic block reports. // if (resetBlockReportTime) { lastBlockReport = startTime - R.nextInt((int)(blockReportInterval)); resetBlockReportTime = false; } else { /* say the last block report was at 8:20:14. The current report * should have started around 9:20:14 (default 1 hour interval). * If current time is : * 1) normal like 9:20:18, next report should be at 10:20:14 * 2) unexpected like 11:35:43, next report should be at * 12:20:14 */ lastBlockReport += (now() - lastBlockReport) / blockReportInterval * blockReportInterval; } processCommand(cmd); } else { data.requestAsyncBlockReport(); if (lastBlockReport > 0) { // this isn't the first report long waitingFor = startTime - lastBlockReport - blockReportInterval; String msg = "Block report is due, and been waiting for it for " + (waitingFor/1000) + " seconds..."; if (waitingFor > LATE_BLOCK_REPORT_WARN_THRESHOLD) { LOG.warn(msg); } else if (waitingFor > LATE_BLOCK_REPORT_INFO_THRESHOLD) { LOG.info(msg); } else if (LOG.isDebugEnabled()) { LOG.debug(msg); } } } } // start block scanner;//启动blockScanner线程 进行block扫描 if (blockScanner != null && blockScannerThread == null && upgradeManager.isUpgradeCompleted()) { LOG.info("Starting Periodic block scanner."); blockScannerThread = new Daemon(blockScanner); blockScannerThread.start(); } // // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat); synchronized(receivedBlockList) { if (waitTime > 0 && receivedBlockList.size() == 0) { try { receivedBlockList.wait(waitTime); } catch (InterruptedException ie) { } delayBeforeBlockReceived(); } } // synchronized } catch(RemoteException re) { String reClass = re.getClassName(); if (UnregisteredDatanodeException.class.getName().equals(reClass) || DisallowedDatanodeException.class.getName().equals(reClass) || IncorrectVersionException.class.getName().equals(reClass)) { LOG.warn("DataNode is shutting down: " + StringUtils.stringifyException(re)); shutdown(); return; } LOG.warn(StringUtils.stringifyException(re)); } catch (IOException e) { LOG.warn(StringUtils.stringifyException(e)); } } // while (shouldRun) } // offerService

?

?

(2)processCommand分析

/** * DNA_UNKNOWN = 0:未知操作 * DNA_TRANSFER = 1:传输块到另一个datanode,创建DataTransfer来传输每个块,请求的类型是OP_WRITE_BLOCK,使用BlockSender来发送块和元数据文件,不对块进行校验 * DNA_INVALIDATE = 2:不合法的块,将所有块删除 * DNA_SHUTDOWN = 3:停止datanode,停止infoServer、DataXceiverServer、DataBlockScanner和处理线程,将存储目录解锁,DataBlockScanner结束可能需要等待1小时 * DNA_REGISTER = 4:重新注册 * DNA_FINALIZE = 5:完成升级,调用DataStorage的finalizeUpgrade方法完成升级 * DNA_RECOVERBLOCK = 6:请求块恢复,创建线程来恢复块,每个线程服务一个块,对于每个块,调用recoverBlock来恢复块信息 * * @param cmd * @return true if further processing may be required or false otherwise. * @throws IOException */ private boolean processCommand(DatanodeCommand cmd) throws IOException { if (cmd == null) return true; final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null; switch(cmd.getAction()) { //传输块到另一个datanode,创建DataTransfer来传输每个块,请求的类型是OP_WRITE_BLOCK,使用BlockSender来发送块和元数据文件,不对块进行校验 case DatanodeProtocol.DNA_TRANSFER: // Send a copy of a block to another datanode transferBlocks(bcmd.getBlocks(), bcmd.getTargets()); myMetrics.incrBlocksReplicated(bcmd.getBlocks().length); break; //不合法的块,将所有块删除 case DatanodeProtocol.DNA_INVALIDATE: // // Some local block(s) are obsolete and can be // safely garbage-collected. // Block toDelete[] = bcmd.getBlocks(); try { if (blockScanner != null) { blockScanner.deleteBlocks(toDelete); } data.invalidate(toDelete); } catch(IOException e) { checkDiskError(); throw e; } myMetrics.incrBlocksRemoved(toDelete.length); break; //停止datanode,停止infoServer、DataXceiverServer、DataBlockScanner和处理线程,将存储目录解锁,DataBlockScanner结束可能需要等待1小时 case DatanodeProtocol.DNA_SHUTDOWN: // shut down the data node this.shutdown(); return false; //重新注册 case DatanodeProtocol.DNA_REGISTER: // namenode requested a registration - at start or if NN lost contact LOG.info("DatanodeCommand action: DNA_REGISTER"); if (shouldRun) { register(); } break; //完成升级,调用DataStorage的finalizeUpgrade方法完成升级 case DatanodeProtocol.DNA_FINALIZE: storage.finalizeUpgrade(); break; case UpgradeCommand.UC_ACTION_START_UPGRADE: // start distributed upgrade here processDistributedUpgradeCommand((UpgradeCommand)cmd); break; //请求块恢复,创建线程来恢复块,每个线程服务一个块,对于每个块,调用recoverBlock来恢复块信息 case DatanodeProtocol.DNA_RECOVERBLOCK: recoverBlocks(bcmd.getBlocks(), bcmd.getTargets()); break; case DatanodeProtocol.DNA_ACCESSKEYUPDATE: LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE"); if (isBlockTokenEnabled) { blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys()); } break; //块均衡 case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE: LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE"); int vsn = ((BalancerBandwidthCommand) cmd).getBalancerBandwidthVersion(); if (vsn >= 1) { long bandwidth = ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue(); if (bandwidth > 0) { DataXceiverServer dxcs = (DataXceiverServer) this.dataXceiverServer.getRunnable(); dxcs.balanceThrottler.setBandwidth(bandwidth); } } break; default: LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); } return true; }

?

?

?

?

?

热点排行