Twitter Storm: Storm的一些常见模式
作者:?xumingming?| 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址:?http://xumingming.sinaapp.com/189/twitter-storm-storm的一些常见模式/
?
本文翻译自:https://github.com/nathanmarz/storm/wiki/Common-patterns。
这篇文章列举出了storm topology里面的一些常见模式:
?
流聚合把两个或者多个数据流聚合成一个数据流 — 基于一些共同的tuple字段。流聚合和SQL里面table join很像,只是table join的输入是有限的,并且join的语义是非常明确的。而流聚合的语义是不明确的并且输入流是无限的。
流类型的聚合类型跟具体的应用是有关了。一些应用把两个流发出的所有的tuple都聚合起来 — 不管多长时间;而另外一些应用则只会聚合一些特定的tuple。而另外一些应用的聚合逻辑又可能完全不一样。而这些聚合类型里面最常见的类型是把所有的输入流进行一样的划分, 这个在storm里面用fields grouping在相同字段上进行grouping就可以了,比如:
帮助1234builder.setBolt(
5
,
new
MyJoiner(), parallelism)
????
.fieldsGrouping(
1
,
new
Fields(
"field1"
,
"field2"
))
????
.fieldsGrouping(
2
,
new
Fields(
"field1"
,
"field2"
))
????
.fieldsGrouping(
3
,
new
Fields(
"field1"
,
"field2"
));
当然,不同的数据流的“相同”字段可以有不一样的名字。
?
有时候为了性能或者一些别的原因, 你可能想把一组tuple一起处理, 而不是一个个单独处理。比如,你可能想批量更新数据库。
如果你想让你的数据处理具有可靠性,正确的方式是保存这些tuple对象的引用直到bolt批量处理这些tuple了。一旦这个批量操作结束, 你可以批量的ack这些tuple。
如果一个bolt发射tuple, 那么你可能想用multi-anchoring来保证可靠性。这一切都取决于具体的应用。关于storm的消息传递的工作原理可以看这篇:?Twitter Storm如何保证消息不丢失。
?
很多bolt有些类似的模式:
遵循这类模式的bolt一般是函数或者是过滤器, 这种模式太常见,storm为这类模式单独封装了一个接口:?IBasicBolt。更多的信息请看:?Twitter Storm如何保证消息不丢失。
?
在bolt的内存里面缓存一些东西非常常见。缓存在和fields grouping结合起来之后就更有用了。比如,你有一个bolt把短链接变成长链接(bit.ly, t.co之类的)。你可以把短链接到长链接的对应关系利用LRU算法缓存在内存里面以避免重复计算。比如组件一发射短链接,组件二把短链接转化成长链接并缓存在内存里面。看一下下面两段代码有什么不一样:
帮助12builder.setBolt(
2
,
new
ExpandUrl(), parallelism)
??
.shuffleGrouping(
1
);
帮助12builder.setBolt(
2
,
new
ExpandUrl(), parallelism)
??
.fieldsGrouping(
1
,
new
Fields(
"url"
));
第二种方式的缓存会比第一种方式的缓存的效率高很多,因为同样的短链接始终被发到同一个task。这会避免不同的机器上有同样的缓存 ?— 浪费内存, 同时也使得同样的短域名更可能在内存里面找到缓存。
?
storm的一个常见的持续计算的模式叫做: “streaming top N”。
比如你有一个bolt发射这样的tuple: ["value", "count"]并且你想一个bolt基于这些信息算出top N的tuple。最简单的办法是有一个bolt可以做一个全局的grouping的动作并且在内存里面保持这top N的值。
这个方式对于大数据量的流显然是没有扩展性的, 因为所有的数据会被发到同一台机器, 单机的处理能力始终是有极限的。一个更好的方法是在多台机器上面并行的计算这个流每一部分的top N, 然后再有一个bolt合并这些机器上面所算出来的top N以算出最后的top N(Map Reduce的思想), 代码大概是这样的:
帮助1234builder.setBolt(
2
,
new
RankObjects(), parallellism)
??
.fieldsGrouping(
1
,
new
Fields(
"value"
));
builder.setBolt(
3
,
new
MergeObjects())
??
.globalGrouping(
2
);
这个模式之所以可行是因为第一个bolt的fields grouping使得这种并行算法在语义上是正确的。
用TimeCacheMap来高效地保存一个最近被更新的对象的缓存
有时候你想在内存里面保存一些最近活跃的对象,以及让那些不再活跃的对象自动过期(删除掉)。TimeCacheMap是一个非常高效的数据结构,它提供了一些callback函数使得我们在对象不再活跃的时候做一些事情。关于TimeCacheMap为什么高效,可以看看这篇分析文章
?
用storm做分布式RPC应用的时候有两种比较常见的模式:它们被封装在?CoordinatedBolt和KeyedFairBolt里面。
CoordinatedBolt包装你的bolt,并且确定什么时候你的bolt已经接收到所有的tuple。它主要使用Direct Stream来做这个。
KeyedFairBolt同样包装你的bolt并且保证你的topology同时处理多个DRPC调用,而不是串行地一次只执行一个。
更多有关分布式RPC的信息可以看这里。