storm任务示例
?LogProcess.java
package mytest;
?
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Map;
?
import mytest.ThroughputTest.GenSpout;
?
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
?
public class LogProcess {
?????????public static class FileSpout extends BaseRichSpout {
?
???????????????????/**
????????????????????*/
???????????????????private static final long serialVersionUID = 1L;
???????????????????private SpoutOutputCollector _collector;
???????????????????private BufferedReader br;
???????????????????private String dataFile;
??????????????????
???????????????????//定义spout文件
???????????????????FileSpout(String dataFile){
????????????????????????????this.dataFile = dataFile;
???????????????????}
?
???????????????????//定义如何读取spout文件
???????????????????@Override
???????????????????public void open(Map conf, TopologyContext context,
?????????????????????????????????????SpoutOutputCollector collector) {
????????????????????????????// TODO Auto-generated method stub
????????????????????????????_collector = collector;
????????????????????????????File csv = new File(dataFile); // log file
????????????????????????????try {
?????????????????????????????????????br = new BufferedReader(new FileReader(csv));
????????????????????????????} catch (FileNotFoundException e) {
?????????????????????????????????????// TODO Auto-generated catch block
?????????????????????????????????????e.printStackTrace();
????????????????????????????}
???????????????????}
?
???????????????????//获取下一个tuple的方法
???????????????????@Override
???????????????????public void nextTuple() {
????????????????????????????// TODO Auto-generated method stub
????????????????????????????try {
????????????????????????????????????
?????????????????????????????????????String line = null;
?????????????????????????????????????while ((line = br.readLine()) != null) {
???????????????????????????????????????????????_collector.emit(new Values(line));
?????????????????????????????????????}
????????????????????????????} catch (FileNotFoundException e) {
?????????????????????????????????????// TODO Auto-generated catch block
?????????????????????????????????????e.printStackTrace();
????????????????????????????} catch (IOException e) {
?????????????????????????????????????// TODO Auto-generated catch block
?????????????????????????????????????e.printStackTrace();
????????????????????????????}
???????????????????}
?
?
???????????????????@Override
???????????????????public void declareOutputFields(OutputFieldsDeclarer declarer) {
????????????????????????????// TODO Auto-generated method stub
????????????????????????????declarer.declare(new Fields("line"));
???????????????????}
??????????????????
?????????}
????????
?
?????????public static class Process extends BaseRichBolt{
?
???????????????????private String _seperator;
???????????????????private String _outFile;
???????????????????PrintWriter pw;
???????????????????private OutputCollector _collector;
???????????????????private BufferedWriter bw;
??????????????????
???????????????????public Process(String seperator,String outFile) {
????????????????????????????this._seperator = seperator;
????????????????????????????this._outFile???= outFile;
???????????????????????????
???????????????????}
??????????????????
???????????????????//把输出结果保存到外部文件里面。
???????????????????@Override
???????????????????public void prepare(Map stormConf, TopologyContext context,
?????????????????????????????????????OutputCollector collector) {
????????????????????????????// TODO Auto-generated method stub
????????????????????????????this._collector = collector;
????????????????????????????File out = new File(_outFile);
????????????????????????????try {
//??????????????????????????????????br = new BufferedWriter(new FileWriter(out));
?????????????????????????????????????bw = new BufferedWriter(new OutputStreamWriter(?
?????????????????????????????new FileOutputStream(out, true)));?
????????????????????????????} catch (IOException e1) {
?????????????????????????????????????// TODO Auto-generated catch block
?????????????????????????????????????e1.printStackTrace();
????????????????????????????}????????????????
???????????????????}
??????????????????
???????????????????//blot计算单元,把tuple中的数据添加一个bkeep和回车。然后保存到outfile指定的文件中。
???????????????????@Override
???????????????????public void execute(Tuple input) {
????????????????????????????// TODO Auto-generated method stub
????????????????????????????String line = input.getString(0);
//?????????????????????????System.out.println(line);
????????????????????????????String[] str = line.split(_seperator);
????????????????????????????System.out.println(str[2]);
????????????????????????????try {
?????????????????????????????????????bw.write(str[2]+",bkeep"+"\n");
?????????????????????????????????????bw.flush();
????????????????????????????} catch (IOException e) {
?????????????????????????????????????// TODO Auto-generated catch block
?????????????????????????????????????e.printStackTrace();
????????????????????????????}
???????????????????????????
????????????????????????????_collector.emit(new Values(line));
???????????????????}
?
???????????????????@Override
???????????????????public void declareOutputFields(OutputFieldsDeclarer declarer) {
????????????????????????????// TODO Auto-generated method stub
????????????????????????????declarer.declare(new Fields("line"));
???????????????????}
??????????????????
?????????}
????????
?????????public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{
???????????????????String dataFile = argv[0];?//输入文件
???????????????????String seperator = argv[1];??????//分隔符
???????????????????String outFile???= argv[2];?//输出文件
???????????????????boolean distribute = Boolean.valueOf(argv[3]);???????//本地模式还是集群模式
???????????????????TopologyBuilder builder = new TopologyBuilder();??//build一个topology
????????builder.setSpout("spout", new FileSpout(dataFile), 1);???//指定spout
????????builder.setBolt("bolt", new Process(seperator,outFile),1).shuffleGrouping("spout");??//指定bolt,包括bolt、process和grouping
????????Config conf = new Config();
????????if(distribute){
????????????StormSubmitter.submitTopology("LogProcess", conf, builder.createTopology());
????????}else{
?????????????????LocalCluster cluster = new LocalCluster();
?????????????????cluster.submitTopology("LogProcess", conf, builder.createTopology());
????????}
?????????}???????
}
?
?
运行
[admin@vkvm161064 guandao]$ pwd
/home/admin/guandao
[admin@vkvm161064 guandao]$ ls
out.txt??storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar??tmp.txt
?
输入文件:
[admin@vkvm161064 guandao]$ cat tmp.txt
a,b,c,d
1,2,3,4
A,B,C,D
xx,ff,ff,ss
xx,ff,alibaba,ss
xx,ff,taobao,ss
xx,xx,xx,xx
xx,xx,ll,xx
xx,xx,hero,xx
?
输出文件:
[admin@vkvm161064 guandao]$ cat out.txt
c,bkeep
3,bkeep
C,bkeep
ff,bkeep
alibaba,bkeep
taobao,bkeep
xx,bkeep
ll,bkeep
hero,bkeep
?
提交topology
[admin@vkvm161064 guandao]$ storm jar ./storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jarSHOT-jar-with-dependencies.jar mytest.LogProcess /home/admin/guandao/tmp.txt , /home/admin/guandao/out.txt fase
?
语法:storm??jar?自己开发的topology???topology_name??inputfile?分隔符???outputfile??true/false(true代表集群运行)