storm-0.8.2源码分析之nimbus启动
nimbus启动脚本启动逻辑
通过bin/stormnimbus会启动nimbus进程,类似hadoop的jobtracker。
bin/storm是一个python写的脚本,支持jar,kill,nimbus,supervisor,ui,drpc等等命令。
python的入口函数如下
def main():
if len(sys.argv) <= 1:
print_usage()
sys.exit(-1)
global CONFIG_OPTS
config_list, args = parse_config_opts(sys.argv[1:])
parse_config(config_list)
COMMAND = args[0]
ARGS = args[1:]
(COMMANDS.get(COMMAND, unknown_command))(*ARGS)
最后这条语句将根据不同的命令调用其对应的方法,启动nimbus这里调用是
nimbus(klass="backtype.storm.daemon.nimbus"),在nimbus方法内部会调用exec_storm_class(所有命令所调用的方法内部都会调用这个),根据指定的入口类backtype.storm.daemon.nimbus去启动程序。exec_storm_class内部其实就是通过exec执行java 参数 包名.类名 来启动对应的程序。
nimbus的clojure入口
nimbus从脚本中执行的时候,会根据之前传递的backtype.storm.daemon.nimbus这个类,去定位到backtype/storm/daemon/nimbus.clj这个clojure的文件(关于clojure的命名空间与实际文件的映射其实和java中是一样的的),其中-main是nimbus.clj的入口函数,相当c++/java中的main入口。
-main非常简单,如下所示
(defn -main []
(-launch (standalone-nimbus)))
下面将依次分析,standalong-numbus,以及-launch args各自做了些什么。
standalong-numbus是个函数,在这个函数内部是实现INimbus的接口,大致过程是用到了clojure.core.protocols中的reify来定义了匿名的类型实现了java中的package backtype.storm.scheduler这个包中的INimbus这个接口(clojure中接口的对应物称为协议)
下面代码中参数列表中的第一个参数this,是类似c++/java中的类的成员函数的第一个隐藏参数this,也就是通过this确定到底是使用这个协议的哪个实现。
先看standalone-nimbus的代码如下
(defn standalone-nimbus []
(reify INimbus
(prepare [this conf local-dir]
)
(allSlotsAvailableForScheduling [this supervisors topologiestopologies-missing-assignments]
(->> supervisors
(mapcat (fn [^SupervisorDetails s]
(for [p (.getMeta s)]
(WorkerSlot. (.getId s)p))))
set ))
(assignSlots [this topology slots]
)
(getForcedScheduler [this]
nil )
(getHostName [this supervisors node-id]
(if-let [^SupervisorDetails supervisor (get supervisors node-id)]
(.getHost supervisor)))
))
allSlotsAvailableForScheduling 按clojure的语义进行转下,代码如下
(set ((mapcat (fn [^SupervisorDetails s]
(for [p (.getMeta s)]
(WorkerSlot. (.getId s)p)))) supervisors)
)
也就是说通过遍历Collection<SupervisorDetails> 类型的变量supervisors取id和meta,用来初始化WorkerSolt对象,mapcat将WorkerSolt形成集合,之后将这个集合作为参数传递给set,使之返回符合接口要求的Collection<WorkerSlot>
getHostName 是取主机名的接口,supervisors 的类型是Map<String,SupervisorDetails> ,node-id的类型是String。其过程是从supervisors中通过node-id去找符合条件的SupervisorDetails对象,如果找到了,就调用SupervisorDetails的getHost方法取得主机名,否者返回的是nil
(getHostName [this supervisors node-id]
(if-let [^SupervisorDetails supervisor (get supervisors node-id)]
(.getHost supervisor)))
)
其他几个接口standalone-nimbus中没有去实现。
注:这里涉及到了clojure与java backtype.storm.scheduler包中几个类的交互。
-launch 将之前的standalone-nimbus做为参数传递给-launch,其代码如下:
(defn -launch [nimbus]
(launch-server! (read-storm-config) nimbus))
配置文件读取
首先,先介绍下(read-storm-config)的引用的过程,nimbus.clj的头部定义命名空间的时候,有一句(:use [backtype.storm.daemon common])引入了common.clj。而common.clj中在定义自身名字空间的时候也引用了其他的其他名字空间(:use [backtype.storm log config util]) ,read-storm-config函数就是在config.clj中实现的,其定义如下:
(defn read-storm-config []
(clojurify-structure (Utils/readStormConfig)))
其中Utils/readStormConfig表示使用了java的backtype.storm.utils包中的Utils类的readStormConfig方法,其定义如下:
publicstatic Map readStormConfig() {
Map ret = readDefaultConfig();
String confFile = System.getProperty("storm.conf.file");
Map storm;
if (confFile==null || confFile.equals("")) {
storm = findAndReadConfigFile("storm.yaml", false);
} else {
storm = findAndReadConfigFile(confFile, true);
}
ret.putAll(storm);
ret.putAll(readCommandLineOpts());
return ret;
}
这个方法就是真正去解析配置的入口,解析过程:先读取默认的defualts.yaml的配置,对于源码来说该文件是在conf目录下,对于release版本则是该文件打到了storm-0.8.2.jar内。
其次,再解析用户配置的storm.yaml中的配置项,如果strom.yaml中有配置项与默认配置文件的配置项有冲突,则会覆盖掉默认配置项。最后,取系统环境变量中设置的storm.options的值,这一般都是没有的,因此这步可以跳过。
注:storm的配置文件用到了yaml这种配置格式,可参考其官方http://www.yaml.org/
再回过头看看(Utils/readStormConfig)取得的配置的map做为参数传递给clojurify-structure做了些什么。
(defn clojurify-structure [s]
(prewalk (fn [x]
(cond (instance? Map x) (into {}x)
(instance? List x) (vec x)
true x))
s))
这个函数的功能是将外部数据结构转换成clojure内部的数据结构,在这里,传进来的是java的map,那么会将其转换成clojure的map。另外,prewalk ([f form])的作用简单来说从集合中取值,该值之后会由经过函数计算后的值来替换掉。
因此, (launch-server!(read-storm-config) nimbus)) 中的(read-storm-config) 就是读取storm的配置文件并返回clojure中的map结构。
启动server
启动server的代码的调度是封装在launch-server这个函数内,先看这个函数
(defn launch-server! [conf nimbus]
(validate-distributed-mode! conf)
(let [service-handler (service-handler conf nimbus)
options (-> (TNonblockingServerSocket. (int (confNIMBUS-THRIFT-PORT)))
(THsHaServer$Args.)
(.workerThreads 64)
(.protocolFactory(TBinaryProtocol$Factory.))
(.processor(Nimbus$Processor. service-handler))
)
server (THsHaServer. options)]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdownservice-handler) (.stop server))))
(log-message "Starting Nimbus server...")
(.serve server)))
先看看第一条语句(validate-distributed-mode!conf)的作用是从底层从配置中读取storm.cluster.mode的值,判断storm.cluster.mode的值配置的是否合法,即,是"local"返回true,是"distributed"返回false,如果是其他的话则抛出异常。之后,validate-distributed-mode!内部判断如果storm.cluster.mode的值是"local"则抛出异常。总结起来简单的说,这个函数是判断在defaults.yaml中的配置项storm.cluster.mode的值是否是"distributed",如果不是的话则会抛出异常(因为,defaults.yaml默认配置的是"distributed")。
;common.clj
(defn validate-distributed-mode! [conf]
(if(local-mode? conf) ;如果是"local"模式会抛出异常
(throw
(IllegalArgumentException. "Cannot start server in localmode!"))))
;config.clj
(defn local-mode? [conf]
(let [mode (conf STORM-CLUSTER-MODE)]; 从配置的map中取storm.cluster.mode的值
(condp = mode
"local" true
"distributed" false
(throw (IllegalArgumentException. ;不是上面两种,抛出异常
(str "Illegal cluster modein conf: " mode)))
)))
看看launch-server!函数中很重要的一句,service-handler(service-handler conf nimbus)
在启动流程的分析过程中,对于这条语句会先简单分析,之后单独拿出来再做详细分析。
(service-handler conf nimbus) 的service-handler其实是一个函数,这个函数不是直接定义的,而是通过defserverfn来定义的,可以在nimbus.clj中找到
(defserverfn service-handler [conf inimbus]...) 其中...是body太长,这里就省略了。
body内部的主要功能是使用clojure的关键字reify去匿名实现一些接口。
内部实现的最主要的接口是package backtype.storm.generated包中的Nimbus类内部的Iface接口。另外还实现了两个接口,分别是package backtype.storm.daemon中的Shutdownable接口,和common.clj中申明的DaemonCommon协议。
这里需要引申说一下,storm是使用thrift作为网络通讯中间件,它提供了序列化和RPC功能,还支持多语言通讯。因此storm是支持多语言客户端的。
使用thrift工具,对编写的.thrift文件生成指定语言的代码,之后只需要实现.thrift中生成的service对应的接口的代码,组织个server启动下,就完成了RPC服务端的功能了。
在storm中.thrift文件位于src/storm.thrift,使用thrift工具生成的代码位于src\jvm\backtype\storm\generated,也就是都属于jvm目录下的packagebacktype.storm.generated。src/storm.thrift内有一个service Nimbus接口,因此在生成backtype.storm.generated包中的Nimbus类内部有接口Iface需要实现,这个就是RPC接口,server实现这些接口,客户端就可以直接调用这些接口,可以像本地调用函数一样调用。
再看分析下(defserverfn service-handler [conf inimbus] ...) 的功能,这条语句相当于是通过宏包裹了函数,并对原有函数执行体进行异常捕捉。可以看到defserverfn的宏定义如下所示:
;common.clj中的宏defserverfn的实现如下所示
(defmacro defserverfn [name & body]
`(let [exec-fn# (fn ~@body)]
(defn ~name [& args#]
(try-cause
(apply exec-fn# args#)
(catch InterruptedException e#
(throw e#))
(catch Throwable t#
(log-error t# "Error on initialization of server " ~(strname))
(halt-process! 13 "Error on initialization")
)))))
;以service-handler带人到宏内,简化后相当于是
;defn server-handler [& args]
;applay (fn [conf inimbus] ... ) [&args]
;其中,server-handler是函数名,而& args就是 confnimbus,三个点号就是原有的函数执行体。
回到launch-server中,看下options最后存的是什么。
options (-> (TNonblockingServerSocket.(int (conf NIMBUS-THRIFT-PORT)))
(THsHaServer$Args.)
(.workerThreads 64)
(.protocolFactory(TBinaryProtocol$Factory.))
(.processor(Nimbus$Processor. service-handler))
)
先解释下->的用法:
在clojure中->的作用是将参数一次传递给下一个form,作为这个form的第一个参数。如果存在多个form这将第一个form计算的结果传递个下一个form,作为下一个form的第一个参数。
因此options后面的代码真正展开就就是如下调用形式:
(.processor
(.protocolFactory
(.workerThreads
(THsHaServer$Args.
(TNonblockingServerSocket. (int(conf NIMBUS-THRIFT-PORT))))
64)
(TBinaryProtocol$Factory.))
(Nimbus$Processor. service-handler))
这里是涉及到了thriftjava库中的相关代码的调用,因此将引入thritf的代码方便解释流程。首先创建一个packageorg.apache.thrift.transport中的TNonblockingServerSocket对象,将给对象作为参数传递给package org.apache.thrift.server中的THsHaServer的内部类Args的构造函数,这样就得到一个Args对象。对Args对象调用workerThreads 方法,设置woker线程数为64,并返回原来的Args对象。然后调用Args的基类AbstractServerArgs(位于TServer.java中)中的protocolFactory方法,并创建一个package org.apache.thrift.protocol中的TBinaryProtocol的内部类Factory的对象,将Factory对象作为AbstractServerArgs.protocolFactory方法的参数,内部是设置inputProtocolFactory,outputProtocolFactory为传递进去的factory,然后返回this,也就是返回Args对象。
最后,调用Args的基类AbstractServerArgs的processor方法,内部就是传递进去的processor参数(由Nimbus$Processor. service-handler这条语句生成),生成processor的工厂类的对象,之后返回this,也就是Args对象。
因此options就是保存了Args对象。
再来看看let内的最后一条语句 server(THsHaServer. options)。其中(THsHaServer. options)是调用了packageorg.apache.thrift.server中的THsHaServer的构造函数来生成一个THsHaServer对象,构造函数是需要一个Args对象,而options保存的就是Args对象。因此server保存了一个THsHaServer对象。
let块的表达式的第一条语句如下:
(.addShutdownHook (Runtime/getRuntime)(Thread. (fn [] (.shutdown service-handler) (.stop server))))
这句的调用转为成java其实就是Runtime.getRuntime.addShutdownHook(Thread hook),而参数hook,就是上面的(Thread.(fn [] (.shutdown service-handler) (.stop server)))
这样的话,就容易理解了,它的作用是,增加JVM停止时要处理的事件,当JVM停止时会调用这些注册的hook。
在看看let块的表达式的最后一条语句(.serveserver) ,这就是启动thrift server。serve方法是在THsHaServer的基类AbstractNonblockingServer中,serve方法的功能就是接收连接处理调用。
其实,熟悉java中如何启动一个thriftserver,对于上诉的在clojure启动thrift server也会比较清楚。如果不清楚在java中是如何启动thriftserver的,可以自行去参考thrift中自带的例子。