HBASE 代码阅读笔记-1 - PUT-2-定位RS和REGION(基于0.94.12)
上一篇http://dennis-lee-gammy.iteye.com/admin/blogs/1972269把put操作的客户端主流程捋了一遍,但是很多比较重要的核心代码还未涉及,会在这一篇或者以后的文章中解析(本篇已完结)
先接着看
HConnectionManager.HConnectionImplementation.processBatchCallback方法吧,主要是完成如何根据tableName和RowKey定位到RS和具体的R
public HRegionLocation locateRegion(final byte[] tableName, final byte[] row) throws IOException { return locateRegion(tableName, row, true, true); } public HRegionLocation relocateRegion(final byte[] tableName, final byte[] row) throws IOException { // Since this is an explicit request not to use any caching, finding // disabled tables should not be desirable. This will ensure that an exception is thrown when // the first time a disabled table is interacted with. if (isTableDisabled(tableName)) { throw new DoNotRetryIOException(Bytes.toString(tableName) + " is disabled."); } return locateRegion(tableName, row, false, true); } //这里mark一下,标记为【0】 private HRegionLocation locateRegion(final byte[] tableName, final byte[] row, boolean useCache, boolean retry) throws IOException { // 判断链接已经关闭以及tableName是否为空 if (this.closed) throw new IOException(toString() + " closed"); if (tableName == null || tableName.length == 0) { throw new IllegalArgumentException( "table name cannot be null or zero length"); } ensureZookeeperTrackers();//MY TODO // 如果是查询 -ROOT- 表,会有独立的逻辑进行处理,继续mark为【1】 if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { try { ServerName servername = this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout); LOG.debug("Looked up root region location, connection=" + this + "; serverName=" + ((servername == null) ? "" : servername.toString())); if (servername == null) return null; return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, servername.getHostname(), servername.getPort()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } } // 下面是处理.META.和用户表,流程是完全一致的,通过父表查子表 // 这是一个向上递归的过程,因为首先进入的是用户表,这里mark为【2】 //,进入mark【4】 else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row, useCache, metaRegionLock, retry); } else { // Region not in the cache - have to go to the meta RS // 这里mark为【3】,进入mark【4】 return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row, useCache, userRegionLock, retry); } }
/* * Search one of the meta tables (-ROOT- or .META.) for the HRegionLocation * info that contains the table and row we're seeking. */ private HRegionLocation locateRegionInMeta(final byte[] parentTable, final byte[] tableName, final byte[] row, boolean useCache, Object regionLockObject, boolean retry) throws IOException { HRegionLocation location; // If we are supposed to be using the cache, look in the cache to see if // we already have the region. // 如果我们已经有了缓存,则直接从缓存中查询,当然,缓存是会失效的 // 比如遇到region的分割、合并、balance,甚至RS当机 if (useCache) { // 这里mark为【4】 location = getCachedLocation(tableName, row); if (location != null) { return location; } } int localNumRetries = retry ? numRetries : 1; // build the key of the meta region we should be looking for. // the extra 9's on the end are necessary to allow "exact" matches // without knowing the precise region names. // 创建一个key来查询region的meta信息, // 用9999999999做id是用于在不知道精确的regionname // 的情况下来定位region(目测暂时不知道为什么这么多9可行,请高手提示) byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false); for (int tries = 0; true; tries++) { if (tries >= localNumRetries) { throw new NoServerForRegionException("Unable to find region for " + Bytes.toStringBinary(row) + " after " + numRetries + " tries."); } // 重试多次仍然无法获取数据则抛出异常 HRegionLocation metaLocation = null; try { // locate the root or meta region // 定位父表(ROOT或者META,回到mark【0】) metaLocation = locateRegion(parentTable, metaKey, true, false); // If null still, go around again.没拿到信息,重试 if (metaLocation == null) continue; // 拿到信息,来一个远程调用吧(备注【i】) HRegionInterface server = getHRegionConnection(metaLocation.getHostname(), metaLocation.getPort()); Result regionInfoRow = null; // This block guards against two threads trying to load the meta // region at the same time. The first will load the meta region and // the second will use the value that the first one found. synchronized (regionLockObject) { // Check the cache again for a hit in case some other thread made the // same query while we were waiting on the lock. // 类似于一个单例模式的双锁检查 // 如果我们在等锁的过程中,已经有其他 // 的线程完成了定位工作并将其放入缓存,那么直接读取缓存 if (useCache) { location = getCachedLocation(tableName, row); if (location != null) { return location; } // If the parent table is META, we may want to pre-fetch some // region info into the global region cache for this table. // 如果父表是META表(当前为用户表),则将该表的一些信息预抓取到全局缓存中 if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME) && (getRegionCachePrefetch(tableName))) { prefetchRegionCache(tableName, row);//MY TODO } //完事了再试一遍缓存 location = getCachedLocation(tableName, row); if (location != null) { return location; } } else { // If we are not supposed to be using the cache, delete any existing cached location // so it won't interfere. // 如果不用缓存就删了,region重定位的时候会走到这里 deleteCachedLocation(tableName, row); } // Query the root or meta region for the location of the meta region // 这里才是正儿八经的远程调用获取信息。 // MS是坑了,备注【i】处的操作可以放到这里, // 在缓存获取成功的情况下,可以省不少不必要的事。备注【ii】,读操作,后续再看 regionInfoRow = server.getClosestRowBefore( metaLocation.getRegionInfo().getRegionName(), metaKey, HConstants.CATALOG_FAMILY); } // 拿不到信息只能异常了 if (regionInfoRow == null) { throw new TableNotFoundException(Bytes.toString(tableName)); } // info:regionInfo,value就是当前需要获取的region的信息 byte[] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); if (value == null || value.length == 0) { throw new IOException("HRegionInfo was null or empty in " + Bytes.toString(parentTable) + ", row=" + regionInfoRow); } // convert the row result into the HRegionLocation we need! HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable( value, new HRegionInfo()); // possible we got a region of a different table... // 还有可能拿到的region信息跟不属于我们查找的表? // 还有这种情况?再次不明觉厉 if (!Bytes.equals(regionInfo.getTableName(), tableName)) { throw new TableNotFoundException( "Table '" + Bytes.toString(tableName) + "' was not found, got: " + Bytes.toString(regionInfo.getTableName()) + "."); } // 如果region正在split,而两个daughter region还未上线, // 那只能说再见了 if (regionInfo.isSplit()) { throw new RegionOfflineException("the only available region for" + " the required row is a split parent," + " the daughters should be online soon: " + regionInfo.getRegionNameAsString()); } // region下线,也只能说88 if (regionInfo.isOffline()) { throw new RegionOfflineException("the region is offline, could" + " be caused by a disable table call: " + regionInfo.getRegionNameAsString()); } // info:server,拿到RS信息 value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); String hostAndPort = ""; if (value != null) { hostAndPort = Bytes.toString(value); } if (hostAndPort.equals("")) { throw new NoServerForRegionException("No server address listed " + "in " + Bytes.toString(parentTable) + " for region " + regionInfo.getRegionNameAsString() + " containing row " + Bytes.toStringBinary(row)); } // Instantiate the location String hostname = Addressing.parseHostname(hostAndPort); int port = Addressing.parsePort(hostAndPort); location = new HRegionLocation(regionInfo, hostname, port); cacheLocation(tableName, location);//缓存当前region信息 return location; } catch (TableNotFoundException e) { // if we got this error, probably means the table just plain doesn't // exist. rethrow the error immediately. this should always be coming // from the HTable constructor. throw e; } catch (IOException e) { if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } if (tries < numRetries - 1) { if (LOG.isDebugEnabled()) { LOG.debug("locateRegionInMeta parentTable=" + Bytes.toString(parentTable) + ", metaLocation=" + ((metaLocation == null) ? "null" : "{" + metaLocation + "}") + ", attempt=" + tries + " of " + this.numRetries + " failed; retrying after sleep of " + ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); } } else { throw e; } // Only relocate the parent region if necessary if (!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) { relocateRegion(parentTable, metaKey); //如果不是region下线或者找不到RS,都重试,根据上面展示的代码,重试的时候会将usecache置为false,以达到清理错误缓存信息的目的 } } try { Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Giving up trying to location region in " + "meta: thread is interrupted."); } } }
HRegionLocation getCachedLocation(final byte[] tableName, final byte[] row) { SoftValueSortedMap<byte[], HRegionLocation> tableLocations = getTableLocations(tableName); // 这里仅仅是根据tablename把缓存的该table的region信息都拿出来 // start to examine the cache. we can only do cache actions // if there's something in the cache for this table. if (tableLocations.isEmpty()) { return null; } // 如果已经有缓存的region信息,再直接用当前rowkey当key取换取region信息 // region的存储索引格式为[startKey,endKey),缓存里面存的rowkey为startkey // 所以这一步的命中率其实是非常低的 HRegionLocation possibleRegion = tableLocations.get(row); if (possibleRegion != null) { return possibleRegion; } // 直接获取失败,在返回一个key严格小于当前rowkey的region信息,即startKey < rowkey // 如果所有的starkKey都比rowkey大,那就说明当前rowkey对应的region信息没有被缓存 // 因为region的存储索引格式为[startKey,endKey) possibleRegion = tableLocations.lowerValueByKey(row); if (possibleRegion == null) { return null; } // make sure that the end key is greater than the row we're looking // for, otherwise the row actually belongs in the next region, not // this one. the exception case is when the endkey is // HConstants.EMPTY_END_ROW, signifying that the region we're // checking is actually the last region in the table. // 如果拿到一个startKey < rowkey // 则当前rowkey就有可能在当前region中,再检查rowkey < endKey 即可 // 因为region的存储索引格式为[startKey,endKey) byte[] endKey = possibleRegion.getRegionInfo().getEndKey(); if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || KeyValue.getRowComparator(tableName).compareRows( endKey, 0, endKey.length, row, 0, row.length) > 0) { return possibleRegion; } // Passed all the way through, so we got nothin - complete cache miss return null; } // 读取该table所有的region信息(已缓存的) private SoftValueSortedMap<byte[], HRegionLocation> getTableLocations( final byte[] tableName) { // find the map of cached locations for this table Integer key = Bytes.mapKey(tableName);//hash一下 SoftValueSortedMap<byte[], HRegionLocation> result; synchronized (this.cachedRegionLocations) { result = this.cachedRegionLocations.get(key); // if tableLocations for this table isn't built yet, make one if (result == null) { result = new SoftValueSortedMap<byte[], HRegionLocation>( Bytes.BYTES_COMPARATOR); this.cachedRegionLocations.put(key, result);//没有的话先创建好缓存对象,然后以当前rowkey为key先塞进去 } } return result; }
private void cacheLocation(final byte[] tableName, final HRegionLocation location) { byte[] startKey = location.getRegionInfo().getStartKey(); Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);//还是先把已经该table已经缓存的region信息拿出来 boolean hasNewCache = false; synchronized (this.cachedRegionLocations) { cachedServers.add(location.getHostnamePort()); hasNewCache = (tableLocations.put(startKey, location) == null); } if (hasNewCache) { LOG.debug("Cached location for " + location.getRegionInfo().getRegionNameAsString() + " is " + location.getHostnamePort()); } } //SoftValueSortedMap<K, V> implements SortedMap<K, V> 提供,key就不说了,value是使用软引用实现的, public V put(K key, V value) { synchronized (sync) { checkReferences(); SoftValue<K, V> oldValue = this.internalMap.put(key, new SoftValue<K, V>(key, value, this.rq)); return oldValue == null ? null : oldValue.get(); } } // 软引用会在GC的时候在没有其他强引用的时候可能被清除,可以提供一个引用队列, // 如果软引用所引用的对象被垃圾回收器回收,Java虚拟机就会把这个软引用加入到与之关联的引用队列中。 // 有关强引用、软引用、若引用以及幽灵引用和GC的知识这里就不关注了先 // 这里就是检查已经被GC的缓存的对象,然后清除 private int checkReferences() { int i = 0; for (Reference<? extends V> ref; (ref = this.rq.poll()) != null; ) { i++; this.internalMap.remove(((SoftValue<K, V>) ref).key); } return i; }
void deleteCachedLocation(final byte[] tableName, final byte[] row) {? ? ? ? ? ? synchronized (this.cachedRegionLocations) {? ? ? ? ? ? ? ? Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);? ? ? ? ? ? ? ? if (!tableLocations.isEmpty()) {? ? ? ? ? ? ? ? ? ? // start to examine the cache. we can only do cache actions? ? ? ? ? ? ? ? ? ? // if there's something in the cache for this table.? ? ? ? ? ? ? ? ? ? HRegionLocation rl = getCachedLocation(tableName, row);? ? ? ? ? ? ? ? ? ? if (rl != null) {? ? ? ? ? ? ? ? ? ? ? ? tableLocations.remove(rl.getRegionInfo().getStartKey());? ? ? ? ? ? ? ? ? ? ? ? if (LOG.isDebugEnabled()) {? ? ? ? ? ? ? ? ? ? ? ? ? ? LOG.debug("Removed " +? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? rl.getRegionInfo().getRegionNameAsString() +? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? " for tableName=" + Bytes.toString(tableName) +? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? " from cache " + "because of " + Bytes.toStringBinary(row));? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }
// 如果父表是META表(当前为用户表),则将该表的一些信息预抓取到全局缓存中
private void prefetchRegionCache(final byte[] tableName, final byte[] row) { // Implement a new visitor for MetaScanner, and use it to walk through // the .META. // 生成一个MetaScannerVisitor用来遍历META表 MetaScannerVisitor visitor = new MetaScannerVisitorBase() { public boolean processRow(Result result) throws IOException { try { // info:regioninfo // 这里的流程跟之前获取用户表的region信息几乎是一模一样的 // 从value中获取region信息,然后检查region对应表是否正确,是否下线。然后缓存起来 // 不过目测没有检查是否在做split,META表是可以被split的,不明为啥,求解释 byte[] value = result.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); HRegionInfo regionInfo = null; if (value != null) { // convert the row result into the HRegionLocation we need! regionInfo = Writables.getHRegionInfo(value); // possible we got a region of a different table... if (!Bytes.equals(regionInfo.getTableName(), tableName)) { return false; // stop scanning } if (regionInfo.isOffline()) { // don't cache offline regions return true; } value = result.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);//info:server if (value == null) { return true; // don't cache it } final String hostAndPort = Bytes.toString(value); String hostname = Addressing.parseHostname(hostAndPort); int port = Addressing.parsePort(hostAndPort); value = result.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);//info:serverStartCode,我去,这里取出来了怎么没用呢? // instantiate the location HRegionLocation loc = new HRegionLocation(regionInfo, hostname, port); // cache this meta entry cacheLocation(tableName, loc); } return true; } catch (RuntimeException e) { throw new IOException(e); } } }; try { // pre-fetch certain number of regions info at region cache. MetaScanner.metaScan(conf, this, visitor, tableName, row, this.prefetchRegionLimit, HConstants.META_TABLE_NAME); } catch (IOException e) { LOG.warn("Encountered problems when prefetch META table: ", e); } }
public static void metaScan(Configuration configuration, HConnection connection, MetaScannerVisitor visitor, byte[] tableName, byte[] row, int rowLimit, final byte[] metaTableName) throws IOException { HTable metaTable = null; try { if (connection == null) { metaTable = new HTable(configuration, HConstants.META_TABLE_NAME, null); } else { metaTable = new HTable(HConstants.META_TABLE_NAME, connection, null); } int rowUpperLimit = rowLimit > 0 ? rowLimit : Integer.MAX_VALUE; // if row is not null, we want to use the startKey of the row's region as // the startRow for the meta scan. byte[] startRow; if (row != null) { // Scan starting at a particular row in a particular table assert tableName != null; byte[] searchRow = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false); Result startRowResult = metaTable.getRowOrBefore(searchRow, HConstants.CATALOG_FAMILY); if (startRowResult == null) { throw new TableNotFoundException("Cannot find row in .META. for table: " + Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow)); } byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); if (value == null || value.length == 0) { throw new IOException("HRegionInfo was null or empty in Meta for " + Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow)); } HRegionInfo regionInfo = Writables.getHRegionInfo(value); byte[] rowBefore = regionInfo.getStartKey(); startRow = HRegionInfo.createRegionName(tableName, rowBefore, HConstants.ZEROES, false); } else if (tableName == null || tableName.length == 0) { // Full META scan startRow = HConstants.EMPTY_START_ROW; } else { // Scan META for an entire table startRow = HRegionInfo.createRegionName( tableName, HConstants.EMPTY_START_ROW, HConstants.ZEROES, false); } // Scan over each meta region ScannerCallable callable; int rows = Math.min(rowLimit, configuration.getInt( HConstants.HBASE_META_SCANNER_CACHING, HConstants.DEFAULT_HBASE_META_SCANNER_CACHING)); do { final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY); if (LOG.isDebugEnabled()) { LOG.debug("Scanning " + Bytes.toString(metaTableName) + " starting at row=" + Bytes.toStringBinary(startRow) + " for max=" + rowUpperLimit + " rows using " + metaTable.getConnection().toString()); } callable = new ScannerCallable(metaTable.getConnection(), metaTableName, scan, null); // Open scanner callable.withRetries(); int processedRows = 0; try { callable.setCaching(rows); done: do { if (processedRows >= rowUpperLimit) { break; } //we have all the rows here Result[] rrs = callable.withRetries(); if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) { break; //exit completely } for (Result rr : rrs) { if (processedRows >= rowUpperLimit) { break done; } if (!visitor.processRow(rr)) break done; //exit completely processedRows++; } //here, we didn't break anywhere. Check if we have more rows } while (true); // Advance the startRow to the end key of the current region startRow = callable.getHRegionInfo().getEndKey(); } finally { // Close scanner callable.setClose(); callable.withRetries(); } } while (Bytes.compareTo(startRow, HConstants.LAST_ROW) != 0); } finally { visitor.close(); if (metaTable != null) { metaTable.close(); } } }
public HRegionInterface getHRegionConnection(final String hostname, final int port) throws IOException { return getHRegionConnection(hostname, port, false); } public HRegionInterface getHRegionConnection(final String hostname, final int port, final boolean master) throws IOException { return getHRegionConnection(hostname, port, null, master); } HRegionInterface getHRegionConnection(final String hostname, final int port, final InetSocketAddress isa, final boolean master) throws IOException { if (master) getMaster(); HRegionInterface server; String rsName = null; if (isa != null) { rsName = Addressing.createHostAndPortStr(isa.getHostName(), isa.getPort()); } else { rsName = Addressing.createHostAndPortStr(hostname, port); } ensureZookeeperTrackers(); // See if we already have a connection (common case) // 这是一个HRegionInterface server = this.servers.get(rsName); if (server == null) { // create a unique lock for this RS (if necessary) this.connectionLock.putIfAbsent(rsName, rsName); // get the RS lock synchronized (this.connectionLock.get(rsName)) { // 双锁检查,很严谨 // do one more lookup in case we were stalled above server = this.servers.get(rsName); if (server == null) { try { // Only create isa when we need to. InetSocketAddress address = isa != null ? isa : new InetSocketAddress(hostname, port); // definitely a cache miss. establish an RPC for this RS // 关键的代码在这里,标记一下吧,研究到RPC的时候再继续了 server = HBaseRPC.waitForProxy(this.rpcEngine, serverInterfaceClass, HRegionInterface.VERSION, address, this.conf, this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); this.servers.put(Addressing.createHostAndPortStr( address.getHostName(), address.getPort()), server); } catch (RemoteException e) { LOG.warn("RemoteException connecting to RS", e); // Throw what the RemoteException was carrying. throw e.unwrapRemoteException(); } } } } return server; }