首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 企业软件 > 行业软件 >

HBASE 代码阅览笔记-1 - PUT-3-提交任务1(基于0.94.12)

2013-11-15 
HBASE 代码阅读笔记-1 - PUT-3-提交任务1(基于0.94.12)终于把RS的定位问题搞清楚了些些,时间不等人,马上看

HBASE 代码阅读笔记-1 - PUT-3-提交任务1(基于0.94.12)
终于把RS的定位问题搞清楚了些些,时间不等人,马上看看connection.processBatch中,step2是如何把任务提交到服务端的吧

之前已经看到,首先创建了一个Callable<MuiltyResponse>对象,而该对象的call方法实际上又创建了一个ServerCallable<MultiResponse> 对象,然后调用了它的withoutRetries方法。
这个方法很简单,调用了connect方法和multi方法
一个个开始啃吧,先看看connect,其中server是ServerCallable的成员,HRegionInterface类型



好吧,原来rpcClient.getProxy才是重头戏,一次次的浇熄哥接近终点的热情

RPCEgine:WritableRpcEngine,你也可以通过hbase.rpc.engine配置一个全限定的类名来覆盖它

Invocation类其实主要是将要执行的类,方法,以及方法参数做了一层基于writable的封装,依赖HbaseObjectWritable类序列化和反序列化参数。这个比较漫长,也比较枯燥,专门拿一天写一篇来解析吧。

现在执行序列回到getHRegionConnection,我们拿到并缓存了一个HRegionInterface的一个代理,这个代理在执行的时候实际上是调用HbaseClient的Call方法。至于Call方法里面都做了些神马,后续再看。

既然连接准备好了,就该调用call方法了,call方法实际上是调用了server.multi方法,也就是HRegionInterface的multi方法。这下终于轮到HbaseClient出场了,隐藏得好深。

public Writable call(Writable param, InetSocketAddress addr,                         Class<? extends VersionedProtocol> protocol,                         User ticket, int rpcTimeout)            throws InterruptedException, IOException {        Call call = new Call(param);//param,这里就是封装好类名、方法名以及参数的invocation对象        Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);//这里开始连接,亲,你真的要连接了吗,不要骗人啊。        connection.sendParam(call);                 // MS真的连接了,还把执行代理发了出去        boolean interrupted = false;        //noinspection SynchronizationOnLocalVariableOrMethodParameter        synchronized (call) {            //如果请求没完成,连接没中断,线程没中断,没超时没错误,等吧。            while (!call.done) {                if (connection.shouldCloseConnection.get()) {                    throw new IOException("Unexpected closed connection");                }                try {                    call.wait(1000);                       // wait for the result                } catch (InterruptedException ignored) {                    // save the fact that we were interrupted                    interrupted = true;                }            }            if (interrupted) {                // set the interrupt flag now that we are done waiting                Thread.currentThread().interrupt();            }            if (call.error != null) {                if (call.error instanceof RemoteException) {                    call.error.fillInStackTrace();                    throw call.error;                }                // local exception                throw wrapException(addr, call.error);            }            return call.value;        }    }


热点排行