storm-0.8.2源码分析之nimbus运行过程(一)
nimbus运行过程
这部分将详细讲述启动后,nimbus是如何运行的。在之前讲述nimbus启动过程,有提到nimbus.clj中有个通过defserverfn定义的核心函数service-handler,是实现了Thrift的service Nimbus代码生成的Nimbus.Iface这个句柄类,这个句柄类需要用户自己去实现内部每个函数的处理流程。在rpc client调用相应的函数接口,rpc server就会调用用户实现的句柄类中对应的函数接口,因此这里就需要我们去关注,在不同接口被调用后,是怎么处理的。特别是topology被上传到nimbus这个任务是如何被解析与分发到supervisor。
从头到尾一次分析这个核心函数,会一点点分析,避免一大块牵扯的东西过多。
(defserverfn service-handler [conf inimbus]
(.prepare inimbus conf (master-inimbus-dir conf))
(log-message "Starting Nimbus with conf " conf)
(let [nimbus (nimbus-data conf inimbus)]
...
)
....)
第一句调用prepare的是standalone-nimbus函数实现的INimbus接口类中的prepare方法,这个方法是个空的实现。
第二句是打印log,也不用关心
之后,是let 的定义,nimbus是一个变量,这个变量是的值,是(nimbus-data conf inimbus)的返回值,也就是一个map。这个这个map包含了之后处理所需要的数据,需要我们仔细去跟踪内部到底有些什么东西。
nimbus-datanimbus-data函数的定义如下所示
(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
:inimbus inimbus
:submitted-count (atom 0)
:storm-cluster-state (cluster/mk-storm-cluster-state conf)
:submit-lock (Object.)
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
:uploaders (file-cache-map conf)
:uptime (uptime-computer)
:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:timer (mk-timer :kill-fn (fn [t]
(log-error t"Error when processing event")
(halt-process!20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
}))
let的bindings定义的变量forced-scheduler的值是个nil,因为INimbus中实现的getForcedScheduler方法是个空实现。
再看看let 块的执行体,它是一个map。其中:conf 的值是一个storm 配置的map,:inimbus的值是INimbus接口的匿名实现也就是一个适配器。submitted-count 的值是一个atom类型的值,初始化为0。:storm-cluster-state的值是什么,需要看看cluster/mk-storm-cluster-state这个函数内部是做了什么。
cluster/mk-storm-cluster-state这个函数内部匿名实现了StormClusterState协议,而实现时又用到了ClusterState协议。这个函数主要用于获取集群的各种状态,底层zookeeper的操作上层封装在了ClusterState协议的实现,而真正底层访问zookeeper使用apache curator这个开源库。
(defn mk-storm-cluster-state[cluster-state-spec]
(let [[solo? cluster-state] (if (satisfies? ClusterStatecluster-state-spec)
[falsecluster-state-spec]
[true(mk-distributed-cluster-state cluster-state-spec)])
先看看几句,因为ClusterState协议的实现是匿名类型的实现,因此satisfies? 判断得到false,也就是使用[true (mk-distributed-cluster-state cluster-state-spec)] 对[solo?cluster-state]进行赋值。
后面是初始化一相关的callback,这很简单,代码如下
assignment-info-callback(atom {})
supervisors-callback (atom nil)
assignments-callback (atom nil)
storm-base-callback (atom {})
之后是生成state id的,调用的是mk-storm-cluster-state适配器中匿名实现StormClusterState协议中的register方法。register方法内部会生成id,并记录到map中,之后返回一个id作为注册id。
state-id(register
cluster-state
(fn [type path]
(let [[subtree & args](tokenize-path path)]
(condp = subtree
ASSIGNMENTS-ROOT (if(empty? args)
(issue-callback! assignments-callback)
(issue-map-callback! assignment-info-callback(first args)))
SUPERVISORS-ROOT(issue-callback! supervisors-callback)
STORMS-ROOT(issue-map-callback! storm-base-callback (first args))
;; this shouldnever happen
(halt-process! 30"Unknown callback for subtree " subtree args)
)
)))]
之后就是在zookeeper上创建一些列根目录,用于存放集群交互的各种信息,目录分别是:
/assignments
/storms
/supervisors
/workerbeats
/errors
再后面就是使用reify关键字匿名实现StormClusterState协议,在实现过程中依赖到了cluster-state,也就是ClusterState协议的实现。其中StormClusterState中的各个接口具体的作用,在之后分析过程中如果遇到,再回过头再解释。
现在回到nimbus-data接着往下分析
:submit-lock (Object.)
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
submit-lock就是一个java的Object对象用于加锁解锁,heartbeats-cache是一个atom引用类型,这个引用所包含的的值是一个空的map。
downloaders是一个TimeCacheMap对象。downloaders的值是(file-cache-mapconf)
其中file-cache-map是一个函数,定义如下
(defn file-cache-map [conf]
(TimeCacheMap.
(int (conf NIMBUS-FILE-COPY-EXPIRATION-SECS))
(reify TimeCacheMap$ExpiredCallback
(expire [this id stream]
(.close stream)
))
))
TimeCacheMap是storm自己实现的,对外提供了ExpiredCallback接口类,这个接口类中只有一个方法是expire方法。在file-cache-map函数中匿名实现了这个接口。
TimeCacheMap对外展示的是一个类似map类,可以在生成这个对象的时候设置超时回调函数(也就是ExpiredCallback这个接口,超时处理,在这里就是关闭流),之后可以像map一样put,get进行操作,在超时时间到了之后,会调用用户设置的回到函数,将超时的数据传递到回调函数中进行处理。需要注意的是,这个类的超时不是很精确。
这里说下为什么会超时不是很精确
TimeCacheMap部分代码如下:
private LinkedList<HashMap<K,V>> _buckets;
_callback = callback;
final long expirationMillis = expirationSecs * 1000L;
final long sleepTime = expirationMillis / (numBuckets-1);
_cleaner = new Thread(new Runnable() {
public void run() {
try {
while(true) {
Map<K, V> dead =null;
Time.sleep(sleepTime);
synchronized(_lock) {
dead =_buckets.removeLast();
_buckets.addFirst(new HashMap<K, V>());
}
if(_callback!=null) {
for(Entry<K,V> entry: dead.entrySet()) {
_callback.expire(entry.getKey(), entry.getValue());
}
}
}
} catch (InterruptedExceptionex) {
}
}
});
_cleaner.setDaemon(true);
_cleaner.start();
_buckets是一个桶,桶里面有若干个子桶,也就是map。以默认值来看,_buckets是有3个子桶,而NIMBUS-FILE-COPY-EXPIRATION-SECS的超时时间是600秒,那么在计算得到的sleepTime就是300秒,在线程中每次进来会先睡300秒,之后去取最后一个子桶,并插入一个新的空的子桶到_buckets头部。之后是将超时的那个子桶中的所有数据换地给callback进行处理。
在看put的操作
public void put(K key, V value) {
synchronized(_lock) {
Iterator<HashMap<K, V>> it = _buckets.iterator();
HashMap<K, V> bucket = it.next();
bucket.put(key, value);
while(it.hasNext()) {
bucket = it.next();
bucket.remove(key);
}
}
}
put总是将数据插入到_buckets的第一个子桶中,并扫描后面的子桶是否有包含put进来的数据,如果有,则移除掉。
那么现在问题来了,如果在线程sleep时,插入一条数据到_buckets的第一个子桶中,它的超时时间将是在2*300至3*300这个时间范围内,因此这个超时时间是不保证精确的,只保证在指定的超时间到了之后会被回调,而非在到的时候立即触发回调。
回到nimbus-data接着看
:uploaders(file-cache-map conf)
:uptime (uptime-computer)
uploaders的值和前面的downloader的值是一样的,也是一个TimeCacheMap对象。
uptime的值是一个函数,是uptime-computer这个函数所返回的一个匿名函数,如下所示:
(defn uptime-computer []
(let [start-time (current-time-secs)]
(fn []
(time-delta start-time)
)))
uptime-computer就是用于计算时间差的,需要注意的是,在添加:uptime (uptime-computer)
这条记录的时候,start-time已经开始取当前时间了,那么在之后真正取:uptime的值开始计算的时候,就会得到时间差。
在service-handler中有条语句nimbus-uptime ((:uptime nimbus)) 这就会计算出,在设置nimbus这个map的时候时间点和当前时间的差。两层圆括号的意思是,先从map中取出值,因为值是一个函数,所以还要再用一对圆括号括起来执行。
回到nimbus-data接着看
:validator(new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error whenprocessing event")
(halt-process!20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
validator的值是一个对象。类名是在default.yaml中所指定的nimbus.topology.validator:"backtype.storm.nimbus.DefaultTopologyValidator",这里的先取到类名,然后根据类名创建对象。
timer的值是一个map ,这个map中包含了定时器,队列,锁等数据。在之前分析supervisor启动过程中也有提到mk-timer,这里再来回顾一下。mk-timer和java中的Timer是很像的。mk-timer的做法是启动一个线程,循环从队列中去peek一个数据,这个数据是一个vector类型的数据,内有会有三个值,分别是时间,函数,和uuid。线程把当前时间和从队列预读的这个数据中的时间值进行比较,如果时间到了,则从队列中弹出这个数据,然后执行这个数据中的第二个值,也就是函数。在执行完之后,会sleep 1秒,这个sleep的实现是在storm自己提供的Time.java中。当sleep结束,会重复这个动作。
mk-timer最后会返回的值如下所示:
{:timer-thread timer-thread
:queue queue
:active active
:lock lock
:cancel-notifier notifier}
也就是定时器线程,队列,锁等数据。
scheduler的值是一个调度器。这个调度器可以有三种,默认的调度器,从INimbus中获取的调度器,以及是用户自定义的调度器。在这里取到的是默认的调度器。
到这,nimbus-data就分析完了。总的来说,小小的一个map,内部还是隐藏了很多的东西,在经过分析之后,之后这个nimbus-data在service-handler中被用到的时候,就知道取的什么值了。