HBASE 代码阅读笔记-1 - PUT-3-提交任务1(基于0.94.12)
终于把RS的定位问题搞清楚了些些,时间不等人,马上看看connection.processBatch中,step2是如何把任务提交到服务端的吧
之前已经看到,首先创建了一个Callable<MuiltyResponse>对象,而该对象的call方法实际上又创建了一个ServerCallable<MultiResponse> 对象,然后调用了它的withoutRetries方法。
这个方法很简单,调用了connect方法和multi方法
一个个开始啃吧,先看看connect,其中server是ServerCallable的成员,HRegionInterface类型
好吧,原来rpcClient.getProxy才是重头戏,一次次的浇熄哥接近终点的热情
RPCEgine:WritableRpcEngine,你也可以通过hbase.rpc.engine配置一个全限定的类名来覆盖它
Invocation类其实主要是将要执行的类,方法,以及方法参数做了一层基于writable的封装,依赖HbaseObjectWritable类序列化和反序列化参数。这个比较漫长,也比较枯燥,专门拿一天写一篇来解析吧。
现在执行序列回到getHRegionConnection,我们拿到并缓存了一个HRegionInterface的一个代理,这个代理在执行的时候实际上是调用HbaseClient的Call方法。至于Call方法里面都做了些神马,后续再看。
既然连接准备好了,就该调用call方法了,call方法实际上是调用了server.multi方法,也就是HRegionInterface的multi方法。这下终于轮到HbaseClient出场了,隐藏得好深。public Writable call(Writable param, InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout) throws InterruptedException, IOException { Call call = new Call(param);//param,这里就是封装好类名、方法名以及参数的invocation对象 Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);//这里开始连接,亲,你真的要连接了吗,不要骗人啊。 connection.sendParam(call); // MS真的连接了,还把执行代理发了出去 boolean interrupted = false; //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (call) { //如果请求没完成,连接没中断,线程没中断,没超时没错误,等吧。 while (!call.done) { if (connection.shouldCloseConnection.get()) { throw new IOException("Unexpected closed connection"); } try { call.wait(1000); // wait for the result } catch (InterruptedException ignored) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } // local exception throw wrapException(addr, call.error); } return call.value; } }