LoadIncrementalHFiles是copy而不是move的疑惑
转载请标明出处:http://blackwing.iteye.com/blog/1991901
之前在另一篇文章里实现的自定义job生成HFile并使用LoadIncrementalHFiles 入库HBase :http://blackwing.iteye.com/blog/1991380
但发现入库时,非常的慢,而且几次都失败了,明明官方教材说这个操作是move的:
The completebulkload utility will move generated StoreFiles into an HBase table. This utility is often used in conjunction with output from Section 15.1.10, “ImportTsv”.
https://issues.apache.org/jira/browse/HBASE-9537https://issues.apache.org/jira/browse/HBASE-8304
./hbase-0.96.0-hadoop1/bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://namenode/outputtsv gonghui_test
./hbase-0.96.0-hadoop1/bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://namenode:8020/outputtsv gonghui_test
LoadIncrementalHFiles.doBulkLoad(Path hfofDir, final HTable table) --> LoadIncrementalHFiles.bulkLoadPhase(final HTable table, final HConnection conn,ExecutorService pool, Deque<LoadQueueItem> queue,final Multimap<ByteBuffer, LoadQueueItem> regionGroups) --> LoadIncrementalHFiles.tryAtomicRegionLoad(final HConnection conn,final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis) --> ProtobufUtil.bulkLoadHFile(final ClientService.BlockingInterface client,final List<Pair<byte[], String>> familyPaths,final byte[] regionName, boolean assignSeqNum) -->
/** * A helper to bulk load a list of HFiles using client protocol. * * @param client * @param familyPaths * @param regionName * @param assignSeqNum * @return true if all are loaded * @throws IOException */ public static boolean bulkLoadHFile(final ClientService.BlockingInterface client, final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum) throws IOException { BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum); try { BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); return response.getLoaded(); } catch (ServiceException se) { throw getRemoteException(se); } }
HRegionServer --> HRegion.bulkLoadHFiles(...) --> HStore.bulkLoadHFile(...) --> HRegionFileSystem.bulkLoadStoreFile(...)
/** * Bulk load: Add a specified store file to the specified family. * If the source file is on the same different file-system is moved from the * source location to the destination location, otherwise is copied over. * * @param familyName Family that will gain the file * @param srcPath {@link Path} to the file to import * @param seqNum Bulk Load sequence number * @return The destination {@link Path} of the bulk loaded file * @throws IOException */ Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) throws IOException { // Copy the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs; // We can't compare FileSystem instances as equals() includes UGI instance // as part of the comparison and won't work when doing SecureBulkLoad // TODO deal with viewFS if (!srcFs.getUri().equals(desFs.getUri())) { LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination store. Copying file over to destination filesystem."); Path tmpPath = createTempName(); //不是同一文件系统,则拷贝 FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf); LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath); srcPath = tmpPath; } return commitStoreFile(familyName, srcPath, seqNum, true); }
FileUtil.copy(...)
/** * Move the file from a build/temp location to the main family store directory. * @param familyName Family that will gain the file * @param buildPath {@link Path} to the file to commit. * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number) * @param generateNewName False if you want to keep the buildPath name * @return The new {@link Path} of the committed file * @throws IOException */ private Path commitStoreFile(final String familyName, final Path buildPath, final long seqNum, final boolean generateNewName) throws IOException { Path storeDir = getStoreDir(familyName); if(!fs.exists(storeDir) && !createDir(storeDir)) throw new IOException("Failed creating " + storeDir); String name = buildPath.getName(); if (generateNewName) { name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_"); } Path dstPath = new Path(storeDir, name); if (!fs.exists(buildPath)) { throw new FileNotFoundException(buildPath.toString()); } LOG.debug("Committing store file " + buildPath + " as " + dstPath); // buildPath exists, therefore not doing an exists() check. //在这里进行rename if (!rename(buildPath, dstPath)) { throw new IOException("Failed rename of " + buildPath + " to " + dstPath); } return dstPath; }
/** * Renames a directory. Assumes the user has already checked for this directory existence. * @param srcpath * @param dstPath * @return true if rename is successful. * @throws IOException */ boolean rename(Path srcpath, Path dstPath) throws IOException { IOException lastIOE = null; int i = 0; do { try { return fs.rename(srcpath, dstPath); } catch (IOException ioe) { lastIOE = ioe; if (!fs.exists(srcpath) && fs.exists(dstPath)) return true; // successful move // dir is not there, retry after some time. sleepBeforeRetry("Rename Directory", i+1); } } while (++i <= hdfsClientRetriesNumber); throw new IOException("Exception in rename", lastIOE); }