storm 实战及实例讲解(三)
storm 实战及实例讲解(三)
——comaple.zhang
——2012-09-13
本讲将接着上一讲,把一个完成的topology完成。上一节主要介绍了一个基本的topology的构造过程,以及每一步所对应的storm集群中分配的资源情况。要想开发storm应用必须对上一讲我提到的那些概念有完全的了解,否则开发出来的应用很有可能有这样那样的问题而无法工作。那么接下来我们来一起定义一个spot节点和bolt节点。
spot节点:在实际的开发中这个节点可以起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务(该方式我会在后续章节中谈到)。这里我们将开发一个简单的模拟数据喷发的节点。具体方式见代码:
package com.jd.comaple.storm.test.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
/**
* Created by IntelliJ IDEA.
* User: comaple.zhang
* Date: 12-8-28
* Time: 下午2:11
* To change this template use File | Settings | File Templates.
*/
public class SimpleSpout extends BaseRichSpout {
/**
* 用来发射数据的工具类
*/
private SpoutOutputCollector collector;
private static String[] info = new String[]{
"comaple\t,12424,44w46,654,12424,44w46,654,",
"lisi\t,435435,6537,12424,44w46,654,",
"lipeng\t,45735,6757,12424,44w46,654,",
"hujintao\t,45735,6757,12424,44w46,654,",
"jiangmin\t,23545,6457,2455,7576,qr44453",
"beijing\t,435435,6537,12424,44w46,654,",
"xiaoming\t,46654,8579,w3675,85877,077998,",
"xiaozhang\t,9789,788,97978,656,345235,09889,",
"ceo\t,46654,8579,w3675,85877,077998,",
"cto\t,46654,8579,w3675,85877,077998,",
"zhansan\t,46654,8579,w3675,85877,077998,"};
Random rd = new Random();
/**
* 这里初始化collector
* @param conf
* @param context
* @param collector
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* 该方法会在SpoutTracker类中被调用每调用一次就可以向storm集群中发射一条数据(一个tuple元组)
* 该方法会被不停的调用
*/
@Override
public void nextTuple() {
try {
String msg = info[rd.nextInt(10)];
//调用发射方法
collector.emit(new Values(msg));
//模拟等待100ms
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 这里定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。
* 该declarer变量有很大作用,我们还可以调用 declarer.declareStream(); 来定义stramId,该id可以用来定义
* 更加复杂的流拓扑结构
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("source"));
}
}
bolt节点: 处理节点,该节点接收喷发节点发送的数据进行简单的处理后,发射出去。
package com.jd.comaple.storm.test.bolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* Created by IntelliJ IDEA.
* User: comaple.zhang
* Date: 12-8-28
* Time: 下午2:11
* To change this template use File | Settings | File Templates.
*/
public class SimpleBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields( "info"));
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String mesg = input.getString(0);
if (mesg != null)
collector.emit(new Values( mesg+"mesg is processed!"));
} catch (Exception e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}