Storm入门之第五章Bolts
本文翻译自《Getting Started With Storm》 译者:吴京润 编辑:方腾飞
第5章 Bolts
正如你已经看到的,bolts是一个Storm集群中的关键组件。你将在这一章学到bolt生命周期,一些bolt设计策略,以及几个有关这些内容的例子。
Bolt生命周期
Bolt是这样一种组件,它把元组作为输入,然后产生新的元组作为输出。实现一个bolt时,通常需要实现IRichBolt接口。Bolts对象由客户端机器创建,序列化为拓扑,并提交给集群中的主机。然后集群启动工人进程反序列化bolt,调用prepare,最后开始处理元组。
NOTE:要创建一个bolt对象,它通过构造器参数初始化成员属性,bolt被提交到集群时,这些属性值会随着一起序列化。
Bolt结构
Bolts拥有如下方法:
declareOutputFields(OutputFieldsDeclarer declarer) 为bolt声明输出模式 prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector) 仅在bolt开始处理元组之前调用 execute(Tuple input) 处理输入的单个元组 cleanup() 在bolt即将关闭时调用
下面看一个例子,在这个例子中bolt把一句话分割成单词列表:
class SplitSentence implements IRichBolt { private OutputCollector collector; publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(new Values(word)); } } public void cleanup(){} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
正如你所看到的,这是一个很简单的bolt。值得一提的是在这个例子里,没有消息担保。这就意味着,如果bolt因为某些原因丢弃了一些消息——不论是因为bolt挂了,还是因为程序故意丢弃的——生成这条消息的spout不会收到任何通知,任何其它的spouts和bolts也不会收到。
然而在许多情况下,你想确保消息在整个拓扑范围内都被处理过了。
可靠的bolts和不可靠的bolts
正如前面所说的,Storm保证通过spout发送的每条消息会得到所有bolt的全面处理。基于设计上的考虑,这意味着你要自己决定你的bolts是否保证这一点。
拓扑是一个树型结构,消息(元组)穿过其中一条或多条分支。树上的每个节点都会调用ack(tuple)或fail(tuple),Storm因此知道一条消息是否失败了,并通知那个/那些制造了这些消息的spout(s)。既然一个Storm拓扑运行在高度并行化的环境里,跟踪始发spout实例的最好方法就是在消息元组内包含一个始发spout引用。这一技巧称做锚定(译者注:原文为Anchoring)。修改一下刚刚讲过的SplitSentence,使它能够确保消息都被处理了。
class SplitSentence implenents IRichBolt { private OutputCollector collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(tuple, new Values(word)); } collector.ack(tuple); } public void cleanup(){} public void declareOutputFields(OutputFieldsDeclarer declarer){ declar.declare(new Fields("word")); } }
锚定发生在调用collector.emit()时。正如前面提到的,Storm可以沿着元组追踪到始发spout。collector.ack(tuple)和collector.fail(tuple)会告知spout每条消息都发生了什么。当树上的每条消息都已被处理了,Storm就认为来自spout的元组被全面的处理了。如果一个元组没有在设置的超时时间内完成对消息树的处理,就认为这个元组处理失败。默认超时时间为30秒。
NOTE:你可以通过修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓扑的超时时间。
当然了spout需要考虑消息的失败情况,并相应的重试或丢弃消息。
NOTE:你处理的每条消息要么是确认的(译者注:collector.ack())要么是失败的(译者注:collector.fail())。Storm使用内存跟踪每个元组,所以如果你不调用这两个方法,该任务最终将耗尽内存。
多数据流
一个bolt可以使用emit(streamId, tuple)把元组分发到多个流,其中参数streamId是一个用来标识流的字符串。然后,你可以在TopologyBuilder决定由哪个流订阅它。
多锚定
为了用bolt连接或聚合数据流,你需要借助内存缓冲元组。为了在这一场景下确保消息完成,你不得不把流锚定到多个元组上。可以向emit方法传入一个元组列表来达成目的。
... List anchors = new ArrayList(); anchors.add(tuple1); anchors.add(tuple2); collector.emit(anchors, values); ...
通过这种方式,bolt在任意时刻调用ack或fail方法,都会通知消息树,而且由于流锚定了多个元组,所有相关的spout都会收到通知。
使用IBasicBolt自动确认
你可能已经注意到了,在许多情况下都需要消息确认。简单起见,Storm提供了另一个用来实现bolt的接口,IBasicBolt。对于该接口的实现类的对象,会在执行execute方法之后自动调用ack方法。
class SplitSentence extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
NOTE:分发消息的BasicOutputCollector自动锚定到作为参数传入的元组。
原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: Storm入门之第五章Bolts
Hi,
I am developing STORM @ Twitter, and happily see your transaltion on this book.
One small problem(typo?)
Under “Bolt结构”
declareOutputFields(OutputFieldsDeclarer declarer)
为bolt声明输入模式
It is obviously declareOutputFields is used for “为bolt声明输出模式”.
Good job!
thanks for your attention.
i’ve modified it.
while i dout that why you can read chinese? are you chinese? but why don’t you reply me in chinese?
I am Chinese but the Chinese input in my laptop in pretty bad so it is very difficult to type. Sorry for that