首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 企业软件 > 行业软件 >

HBASE 代码阅览笔记-1 - PUT-2-定位RS和REGION(基于0.94.12)

2013-11-16 
HBASE 代码阅读笔记-1 - PUT-2-定位RS和REGION(基于0.94.12)上一篇http://dennis-lee-gammy.iteye.com/adm

HBASE 代码阅读笔记-1 - PUT-2-定位RS和REGION(基于0.94.12)
上一篇http://dennis-lee-gammy.iteye.com/admin/blogs/1972269把put操作的客户端主流程捋了一遍,但是很多比较重要的核心代码还未涉及,会在这一篇或者以后的文章中解析(本篇已完结)

先接着看
HConnectionManager.HConnectionImplementation.processBatchCallback方法吧,主要是完成如何根据tableName和RowKey定位到RS和具体的R

        public HRegionLocation locateRegion(final byte[] tableName,                                            final byte[] row)                throws IOException {            return locateRegion(tableName, row, true, true);        }        public HRegionLocation relocateRegion(final byte[] tableName,                                              final byte[] row)                throws IOException {            // Since this is an explicit request not to use any caching, finding            // disabled tables should not be desirable.  This will ensure that an exception is thrown when            // the first time a disabled table is interacted with.            if (isTableDisabled(tableName)) {                throw new DoNotRetryIOException(Bytes.toString(tableName) + " is disabled.");            }            return locateRegion(tableName, row, false, true);        }        //这里mark一下,标记为【0】        private HRegionLocation locateRegion(final byte[] tableName,                                             final byte[] row, boolean useCache, boolean retry)                throws IOException {            // 判断链接已经关闭以及tableName是否为空            if (this.closed) throw new IOException(toString() + " closed");            if (tableName == null || tableName.length == 0) {                throw new IllegalArgumentException(                        "table name cannot be null or zero length");            }            ensureZookeeperTrackers();//MY TODO            // 如果是查询 -ROOT- 表,会有独立的逻辑进行处理,继续mark为【1】            if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {                try {                    ServerName servername = this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout);                    LOG.debug("Looked up root region location, connection=" + this +                            "; serverName=" + ((servername == null) ? "" : servername.toString()));                    if (servername == null) return null;                    return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO,                            servername.getHostname(), servername.getPort());                } catch (InterruptedException e) {                    Thread.currentThread().interrupt();                    return null;                }            }             // 下面是处理.META.和用户表,流程是完全一致的,通过父表查子表            // 这是一个向上递归的过程,因为首先进入的是用户表,这里mark为【2】            //,进入mark【4】            else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {                return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row,                        useCache, metaRegionLock, retry);            } else {                // Region not in the cache - have to go to the meta RS                // 这里mark为【3】,进入mark【4】                return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,                        useCache, userRegionLock, retry);            }        } 


-ROOT- 表的定位先放一放,咱们就按实际的处理顺序来看看代码。像幼儿园的小学生一样掰着手指头捋一遍。
首先,进入流程的是用户表,locateRegionInMeta方法负责读取用户表和.META.表的信息。
流程进入mark0,然后判断为非ROOT表和META表,进入mark3

         /*          * Search one of the meta tables (-ROOT- or .META.) for the HRegionLocation          * info that contains the table and row we're seeking.          */        private HRegionLocation locateRegionInMeta(final byte[] parentTable,                                                   final byte[] tableName, final byte[] row, boolean useCache,                                                   Object regionLockObject, boolean retry)                throws IOException {            HRegionLocation location;            // If we are supposed to be using the cache, look in the cache to see if            // we already have the region.            // 如果我们已经有了缓存,则直接从缓存中查询,当然,缓存是会失效的            // 比如遇到region的分割、合并、balance,甚至RS当机            if (useCache) { // 这里mark为【4】                location = getCachedLocation(tableName, row);                if (location != null) {                    return location;                }            }            int localNumRetries = retry ? numRetries : 1;            // build the key of the meta region we should be looking for.            // the extra 9's on the end are necessary to allow "exact" matches            // without knowing the precise region names.            // 创建一个key来查询region的meta信息,            // 用9999999999做id是用于在不知道精确的regionname            // 的情况下来定位region(目测暂时不知道为什么这么多9可行,请高手提示)            byte[] metaKey = HRegionInfo.createRegionName(tableName, row,                    HConstants.NINES, false);            for (int tries = 0; true; tries++) {                if (tries >= localNumRetries) {                    throw new NoServerForRegionException("Unable to find region for "                            + Bytes.toStringBinary(row) + " after " + numRetries + " tries.");                }                // 重试多次仍然无法获取数据则抛出异常                HRegionLocation metaLocation = null;                try {                    // locate the root or meta region                    // 定位父表(ROOT或者META,回到mark【0】)                    metaLocation = locateRegion(parentTable, metaKey, true, false);                    // If null still, go around again.没拿到信息,重试                    if (metaLocation == null) continue;                    // 拿到信息,来一个远程调用吧(备注【i】)                    HRegionInterface server =                            getHRegionConnection(metaLocation.getHostname(), metaLocation.getPort());                    Result regionInfoRow = null;                    // This block guards against two threads trying to load the meta                    // region at the same time. The first will load the meta region and                    // the second will use the value that the first one found.                    synchronized (regionLockObject) {                        // Check the cache again for a hit in case some other thread made the                        // same query while we were waiting on the lock.                        // 类似于一个单例模式的双锁检查                        // 如果我们在等锁的过程中,已经有其他                        // 的线程完成了定位工作并将其放入缓存,那么直接读取缓存                        if (useCache) {                            location = getCachedLocation(tableName, row);                            if (location != null) {                                return location;                            }                            // If the parent table is META, we may want to pre-fetch some                            // region info into the global region cache for this table.                            // 如果父表是META表(当前为用户表),则将该表的一些信息预抓取到全局缓存中                            if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME)                                    && (getRegionCachePrefetch(tableName))) {                                prefetchRegionCache(tableName, row);//MY TODO                            }                            //完事了再试一遍缓存                            location = getCachedLocation(tableName, row);                            if (location != null) {                                return location;                            }                        } else {                            // If we are not supposed to be using the cache, delete any existing cached location                            // so it won't interfere.                            // 如果不用缓存就删了,region重定位的时候会走到这里                            deleteCachedLocation(tableName, row);                        }                        // Query the root or meta region for the location of the meta region                        // 这里才是正儿八经的远程调用获取信息。                        // MS是坑了,备注【i】处的操作可以放到这里,                        // 在缓存获取成功的情况下,可以省不少不必要的事。备注【ii】,读操作,后续再看                        regionInfoRow = server.getClosestRowBefore(                                metaLocation.getRegionInfo().getRegionName(), metaKey,                                HConstants.CATALOG_FAMILY);                    }                    // 拿不到信息只能异常了                    if (regionInfoRow == null) {                        throw new TableNotFoundException(Bytes.toString(tableName));                    }                    // info:regionInfo,value就是当前需要获取的region的信息                    byte[] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,                            HConstants.REGIONINFO_QUALIFIER);                    if (value == null || value.length == 0) {                        throw new IOException("HRegionInfo was null or empty in " +                                Bytes.toString(parentTable) + ", row=" + regionInfoRow);                    }                    // convert the row result into the HRegionLocation we need!                    HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(                            value, new HRegionInfo());                    // possible we got a region of a different table...                    // 还有可能拿到的region信息跟不属于我们查找的表?                    // 还有这种情况?再次不明觉厉                    if (!Bytes.equals(regionInfo.getTableName(), tableName)) {                        throw new TableNotFoundException(                                "Table '" + Bytes.toString(tableName) + "' was not found, got: " +                                        Bytes.toString(regionInfo.getTableName()) + ".");                    }                    // 如果region正在split,而两个daughter region还未上线,                    // 那只能说再见了                    if (regionInfo.isSplit()) {                        throw new RegionOfflineException("the only available region for" +                                " the required row is a split parent," +                                " the daughters should be online soon: " +                                regionInfo.getRegionNameAsString());                    }                    // region下线,也只能说88                      if (regionInfo.isOffline()) {                        throw new RegionOfflineException("the region is offline, could" +                                " be caused by a disable table call: " +                                regionInfo.getRegionNameAsString());                    }                    // info:server,拿到RS信息                    value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,                            HConstants.SERVER_QUALIFIER);                    String hostAndPort = "";                    if (value != null) {                        hostAndPort = Bytes.toString(value);                    }                    if (hostAndPort.equals("")) {                        throw new NoServerForRegionException("No server address listed " +                                "in " + Bytes.toString(parentTable) + " for region " +                                regionInfo.getRegionNameAsString() + " containing row " +                                Bytes.toStringBinary(row));                    }                    // Instantiate the location                    String hostname = Addressing.parseHostname(hostAndPort);                    int port = Addressing.parsePort(hostAndPort);                    location = new HRegionLocation(regionInfo, hostname, port);                    cacheLocation(tableName, location);//缓存当前region信息                    return location;                } catch (TableNotFoundException e) {                    // if we got this error, probably means the table just plain doesn't                    // exist. rethrow the error immediately. this should always be coming                    // from the HTable constructor.                    throw e;                } catch (IOException e) {                    if (e instanceof RemoteException) {                        e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);                    }                    if (tries < numRetries - 1) {                        if (LOG.isDebugEnabled()) {                            LOG.debug("locateRegionInMeta parentTable=" +                                    Bytes.toString(parentTable) + ", metaLocation=" +                                    ((metaLocation == null) ? "null" : "{" + metaLocation + "}") +                                    ", attempt=" + tries + " of " +                                    this.numRetries + " failed; retrying after sleep of " +                                    ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());                        }                    } else {                        throw e;                    }                    // Only relocate the parent region if necessary                    if (!(e instanceof RegionOfflineException ||                            e instanceof NoServerForRegionException)) {                        relocateRegion(parentTable, metaKey);                         //如果不是region下线或者找不到RS,都重试,根据上面展示的代码,重试的时候会将usecache置为false,以达到清理错误缓存信息的目的                    }                }                try {                    Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));                } catch (InterruptedException e) {                    Thread.currentThread().interrupt();                    throw new IOException("Giving up trying to location region in " +                            "meta: thread is interrupted.");                }            }        }


定位region的主流程到这里就完结了,整个是一个向上递归的过程通过user表-meta表-root表-meta表-user表的流程。每一次都会尝试从缓存里面获取信息,并根据处理结果缓存信息或者清理失效的老信息。

接着看一下如何从缓存中获取信息吧
     HRegionLocation getCachedLocation(final byte[] tableName,                                          final byte[] row) {            SoftValueSortedMap<byte[], HRegionLocation> tableLocations =                    getTableLocations(tableName);            // 这里仅仅是根据tablename把缓存的该table的region信息都拿出来              // start to examine the cache. we can only do cache actions            // if there's something in the cache for this table.            if (tableLocations.isEmpty()) {                return null;            }            // 如果已经有缓存的region信息,再直接用当前rowkey当key取换取region信息            // region的存储索引格式为[startKey,endKey),缓存里面存的rowkey为startkey            // 所以这一步的命中率其实是非常低的            HRegionLocation possibleRegion = tableLocations.get(row);            if (possibleRegion != null) {                return possibleRegion;            }            // 直接获取失败,在返回一个key严格小于当前rowkey的region信息,即startKey < rowkey            // 如果所有的starkKey都比rowkey大,那就说明当前rowkey对应的region信息没有被缓存            // 因为region的存储索引格式为[startKey,endKey)            possibleRegion = tableLocations.lowerValueByKey(row);            if (possibleRegion == null) {                return null;            }            // make sure that the end key is greater than the row we're looking            // for, otherwise the row actually belongs in the next region, not            // this one. the exception case is when the endkey is            // HConstants.EMPTY_END_ROW, signifying that the region we're            // checking is actually the last region in the table.            // 如果拿到一个startKey < rowkey            // 则当前rowkey就有可能在当前region中,再检查rowkey < endKey 即可            // 因为region的存储索引格式为[startKey,endKey)             byte[] endKey = possibleRegion.getRegionInfo().getEndKey();            if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||                    KeyValue.getRowComparator(tableName).compareRows(                            endKey, 0, endKey.length, row, 0, row.length) > 0) {                return possibleRegion;            }            // Passed all the way through, so we got nothin - complete cache miss            return null;        }        // 读取该table所有的region信息(已缓存的)        private SoftValueSortedMap<byte[], HRegionLocation> getTableLocations(                final byte[] tableName) {            // find the map of cached locations for this table            Integer key = Bytes.mapKey(tableName);//hash一下            SoftValueSortedMap<byte[], HRegionLocation> result;            synchronized (this.cachedRegionLocations) {                result = this.cachedRegionLocations.get(key);                // if tableLocations for this table isn't built yet, make one                if (result == null) {                    result = new SoftValueSortedMap<byte[], HRegionLocation>(                            Bytes.BYTES_COMPARATOR);                    this.cachedRegionLocations.put(key, result);//没有的话先创建好缓存对象,然后以当前rowkey为key先塞进去                }            }            return result;        }


读取缓存的就完事了,然后看看写入和删除

先看看写入吧
      private void cacheLocation(final byte[] tableName,                                   final HRegionLocation location) {            byte[] startKey = location.getRegionInfo().getStartKey();            Map<byte[], HRegionLocation> tableLocations =                    getTableLocations(tableName);//还是先把已经该table已经缓存的region信息拿出来            boolean hasNewCache = false;            synchronized (this.cachedRegionLocations) {                cachedServers.add(location.getHostnamePort());                hasNewCache = (tableLocations.put(startKey, location) == null);            }            if (hasNewCache) {                LOG.debug("Cached location for " +                        location.getRegionInfo().getRegionNameAsString() +                        " is " + location.getHostnamePort());            }        }    //SoftValueSortedMap<K, V> implements SortedMap<K, V> 提供,key就不说了,value是使用软引用实现的,       public V put(K key, V value) {        synchronized (sync) {            checkReferences();            SoftValue<K, V> oldValue = this.internalMap.put(key,                    new SoftValue<K, V>(key, value, this.rq));            return oldValue == null ? null : oldValue.get();        }    }    // 软引用会在GC的时候在没有其他强引用的时候可能被清除,可以提供一个引用队列,    // 如果软引用所引用的对象被垃圾回收器回收,Java虚拟机就会把这个软引用加入到与之关联的引用队列中。    // 有关强引用、软引用、若引用以及幽灵引用和GC的知识这里就不关注了先    // 这里就是检查已经被GC的缓存的对象,然后清除    private int checkReferences() {        int i = 0;        for (Reference<? extends V> ref; (ref = this.rq.poll()) != null; ) {            i++;            this.internalMap.remove(((SoftValue<K, V>) ref).key);        }        return i;    }


写入这就完事了,最后再看看删除,其实就是调用一遍读流程,不为空的话则删除
      void deleteCachedLocation(final byte[] tableName, final byte[] row) {? ? ? ? ? ? synchronized (this.cachedRegionLocations) {? ? ? ? ? ? ? ? Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);? ? ? ? ? ? ? ? if (!tableLocations.isEmpty()) {? ? ? ? ? ? ? ? ? ? // start to examine the cache. we can only do cache actions? ? ? ? ? ? ? ? ? ? // if there's something in the cache for this table.? ? ? ? ? ? ? ? ? ? HRegionLocation rl = getCachedLocation(tableName, row);? ? ? ? ? ? ? ? ? ? if (rl != null) {? ? ? ? ? ? ? ? ? ? ? ? tableLocations.remove(rl.getRegionInfo().getStartKey());? ? ? ? ? ? ? ? ? ? ? ? if (LOG.isDebugEnabled()) {? ? ? ? ? ? ? ? ? ? ? ? ? ? LOG.debug("Removed " +? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? rl.getRegionInfo().getRegionNameAsString() +? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? " for tableName=" + Bytes.toString(tableName) +? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? " from cache " + "because of " + Bytes.toStringBinary(row));? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }


最后还有一个小尾巴,先看看上面的注释:
// 如果父表是META表(当前为用户表),则将该表的一些信息预抓取到全局缓存中


好吧,这个尾巴其实不小
private void prefetchRegionCache(final byte[] tableName,                                         final byte[] row) {            // Implement a new visitor for MetaScanner, and use it to walk through            // the .META.            // 生成一个MetaScannerVisitor用来遍历META表            MetaScannerVisitor visitor = new MetaScannerVisitorBase() {                public boolean processRow(Result result) throws IOException {                    try {                        // info:regioninfo                        // 这里的流程跟之前获取用户表的region信息几乎是一模一样的                        // 从value中获取region信息,然后检查region对应表是否正确,是否下线。然后缓存起来                        // 不过目测没有检查是否在做split,META表是可以被split的,不明为啥,求解释                        byte[] value = result.getValue(HConstants.CATALOG_FAMILY,                                HConstants.REGIONINFO_QUALIFIER);                        HRegionInfo regionInfo = null;                        if (value != null) {                            // convert the row result into the HRegionLocation we need!                            regionInfo = Writables.getHRegionInfo(value);                            // possible we got a region of a different table...                            if (!Bytes.equals(regionInfo.getTableName(),                                    tableName)) {                                return false; // stop scanning                            }                            if (regionInfo.isOffline()) {                                // don't cache offline regions                                return true;                            }                            value = result.getValue(HConstants.CATALOG_FAMILY,                                    HConstants.SERVER_QUALIFIER);//info:server                            if (value == null) {                                return true;  // don't cache it                            }                            final String hostAndPort = Bytes.toString(value);                            String hostname = Addressing.parseHostname(hostAndPort);                            int port = Addressing.parsePort(hostAndPort);                            value = result.getValue(HConstants.CATALOG_FAMILY,                                    HConstants.STARTCODE_QUALIFIER);//info:serverStartCode,我去,这里取出来了怎么没用呢?                            // instantiate the location                            HRegionLocation loc =                                    new HRegionLocation(regionInfo, hostname, port);                            // cache this meta entry                            cacheLocation(tableName, loc);                        }                        return true;                    } catch (RuntimeException e) {                        throw new IOException(e);                    }                }            };            try {                // pre-fetch certain number of regions info at region cache.                MetaScanner.metaScan(conf, this, visitor, tableName, row,                        this.prefetchRegionLimit, HConstants.META_TABLE_NAME);            } catch (IOException e) {                LOG.warn("Encountered problems when prefetch META table: ", e);            }        }

看看这个MetaScannerVisitor 在整个scan过程中起了什么作用呢?MetaScanner.metaScan。
代码量看着不小,其实就是scan了一遍meta表,每扫描到一个rowkey就调用一次visitor。
作为一个客户端scan的过程,扫描游标大小同样受hbase.client.prefetch.limit配置限制,默认是10。
扫描的过程就是一个非常标准的读过程,这里不再继续深究了,等到研究读代码的时候再说
   public static void metaScan(Configuration configuration, HConnection connection,                                MetaScannerVisitor visitor, byte[] tableName, byte[] row,                                int rowLimit, final byte[] metaTableName)            throws IOException {        HTable metaTable = null;        try {            if (connection == null) {                metaTable = new HTable(configuration, HConstants.META_TABLE_NAME, null);            } else {                metaTable = new HTable(HConstants.META_TABLE_NAME, connection, null);            }            int rowUpperLimit = rowLimit > 0 ? rowLimit : Integer.MAX_VALUE;            // if row is not null, we want to use the startKey of the row's region as            // the startRow for the meta scan.            byte[] startRow;            if (row != null) {                // Scan starting at a particular row in a particular table                assert tableName != null;                byte[] searchRow =                        HRegionInfo.createRegionName(tableName, row, HConstants.NINES,                                false);                Result startRowResult = metaTable.getRowOrBefore(searchRow,                        HConstants.CATALOG_FAMILY);                if (startRowResult == null) {                    throw new TableNotFoundException("Cannot find row in .META. for table: "                            + Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow));                }                byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY,                        HConstants.REGIONINFO_QUALIFIER);                if (value == null || value.length == 0) {                    throw new IOException("HRegionInfo was null or empty in Meta for " +                            Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow));                }                HRegionInfo regionInfo = Writables.getHRegionInfo(value);                byte[] rowBefore = regionInfo.getStartKey();                startRow = HRegionInfo.createRegionName(tableName, rowBefore,                        HConstants.ZEROES, false);            } else if (tableName == null || tableName.length == 0) {                // Full META scan                startRow = HConstants.EMPTY_START_ROW;            } else {                // Scan META for an entire table                startRow = HRegionInfo.createRegionName(                        tableName, HConstants.EMPTY_START_ROW, HConstants.ZEROES, false);            }            // Scan over each meta region            ScannerCallable callable;            int rows = Math.min(rowLimit, configuration.getInt(                    HConstants.HBASE_META_SCANNER_CACHING,                    HConstants.DEFAULT_HBASE_META_SCANNER_CACHING));            do {                final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY);                if (LOG.isDebugEnabled()) {                    LOG.debug("Scanning " + Bytes.toString(metaTableName) +                            " starting at row=" + Bytes.toStringBinary(startRow) + " for max=" +                            rowUpperLimit + " rows using " + metaTable.getConnection().toString());                }                callable = new ScannerCallable(metaTable.getConnection(), metaTableName, scan, null);                // Open scanner                callable.withRetries();                int processedRows = 0;                try {                    callable.setCaching(rows);                    done:                    do {                        if (processedRows >= rowUpperLimit) {                            break;                        }                        //we have all the rows here                        Result[] rrs = callable.withRetries();                        if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) {                            break; //exit completely                        }                        for (Result rr : rrs) {                            if (processedRows >= rowUpperLimit) {                                break done;                            }                            if (!visitor.processRow(rr))                                break done; //exit completely                            processedRows++;                        }                        //here, we didn't break anywhere. Check if we have more rows                    } while (true);                    // Advance the startRow to the end key of the current region                    startRow = callable.getHRegionInfo().getEndKey();                } finally {                    // Close scanner                    callable.setClose();                    callable.withRetries();                }            } while (Bytes.compareTo(startRow, HConstants.LAST_ROW) != 0);        } finally {            visitor.close();            if (metaTable != null) {                metaTable.close();            }        }    }



于是乎,region的定位代码这里就OK了。

补充一段备注处的代码,看看是否值得将那里的操作后置
        public HRegionInterface getHRegionConnection(final String hostname,                                                     final int port)                throws IOException {            return getHRegionConnection(hostname, port, false);        }        public HRegionInterface getHRegionConnection(final String hostname,                                                     final int port, final boolean master)                throws IOException {            return getHRegionConnection(hostname, port, null, master);        }        HRegionInterface getHRegionConnection(final String hostname, final int port,                                              final InetSocketAddress isa, final boolean master)                throws IOException {            if (master) getMaster();            HRegionInterface server;            String rsName = null;            if (isa != null) {                rsName = Addressing.createHostAndPortStr(isa.getHostName(),                        isa.getPort());            } else {                rsName = Addressing.createHostAndPortStr(hostname, port);            }            ensureZookeeperTrackers();            // See if we already have a connection (common case)            // 这是一个HRegionInterface            server = this.servers.get(rsName);            if (server == null) {                // create a unique lock for this RS (if necessary)                this.connectionLock.putIfAbsent(rsName, rsName);                // get the RS lock                synchronized (this.connectionLock.get(rsName)) {                    // 双锁检查,很严谨                     // do one more lookup in case we were stalled above                    server = this.servers.get(rsName);                    if (server == null) {                        try {                            // Only create isa when we need to.                            InetSocketAddress address = isa != null ? isa :                                    new InetSocketAddress(hostname, port);                            // definitely a cache miss. establish an RPC for this RS                            // 关键的代码在这里,标记一下吧,研究到RPC的时候再继续了                            server = HBaseRPC.waitForProxy(this.rpcEngine,                                    serverInterfaceClass, HRegionInterface.VERSION,                                    address, this.conf,                                    this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);                            this.servers.put(Addressing.createHostAndPortStr(                                    address.getHostName(), address.getPort()), server);                        } catch (RemoteException e) {                            LOG.warn("RemoteException connecting to RS", e);                            // Throw what the RemoteException was carrying.                            throw e.unwrapRemoteException();                        }                    }                }            }            return server;        }

热点排行