首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

storm任务示范

2013-10-23 
storm任务示例?LogProcess.javapackage mytest?import java.io.BufferedReaderimport java.io.BufferedW

storm任务示例

?LogProcess.java

package mytest;

?

import java.io.BufferedReader;

import java.io.BufferedWriter;

import java.io.File;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.FileReader;

import java.io.FileWriter;

import java.io.IOException;

import java.io.OutputStreamWriter;

import java.io.PrintWriter;

import java.util.Map;

?

import mytest.ThroughputTest.GenSpout;

?

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

?

public class LogProcess {

?????????public static class FileSpout extends BaseRichSpout {

?

???????????????????/**

????????????????????*/

???????????????????private static final long serialVersionUID = 1L;

???????????????????private SpoutOutputCollector _collector;

???????????????????private BufferedReader br;

???????????????????private String dataFile;

??????????????????

???????????????????//定义spout文件

???????????????????FileSpout(String dataFile){

????????????????????????????this.dataFile = dataFile;

???????????????????}

?

???????????????????//定义如何读取spout文件

???????????????????@Override

???????????????????public void open(Map conf, TopologyContext context,

?????????????????????????????????????SpoutOutputCollector collector) {

????????????????????????????// TODO Auto-generated method stub

????????????????????????????_collector = collector;

????????????????????????????File csv = new File(dataFile); // log file

????????????????????????????try {

?????????????????????????????????????br = new BufferedReader(new FileReader(csv));

????????????????????????????} catch (FileNotFoundException e) {

?????????????????????????????????????// TODO Auto-generated catch block

?????????????????????????????????????e.printStackTrace();

????????????????????????????}

???????????????????}

?

???????????????????//获取下一个tuple的方法

???????????????????@Override

???????????????????public void nextTuple() {

????????????????????????????// TODO Auto-generated method stub

????????????????????????????try {

????????????????????????????????????

?????????????????????????????????????String line = null;

?????????????????????????????????????while ((line = br.readLine()) != null) {

???????????????????????????????????????????????_collector.emit(new Values(line));

?????????????????????????????????????}

????????????????????????????} catch (FileNotFoundException e) {

?????????????????????????????????????// TODO Auto-generated catch block

?????????????????????????????????????e.printStackTrace();

????????????????????????????} catch (IOException e) {

?????????????????????????????????????// TODO Auto-generated catch block

?????????????????????????????????????e.printStackTrace();

????????????????????????????}

???????????????????}

?

?

???????????????????@Override

???????????????????public void declareOutputFields(OutputFieldsDeclarer declarer) {

????????????????????????????// TODO Auto-generated method stub

????????????????????????????declarer.declare(new Fields("line"));

???????????????????}

??????????????????

?????????}

????????

?

?????????public static class Process extends BaseRichBolt{

?

???????????????????private String _seperator;

???????????????????private String _outFile;

???????????????????PrintWriter pw;

???????????????????private OutputCollector _collector;

???????????????????private BufferedWriter bw;

??????????????????

???????????????????public Process(String seperator,String outFile) {

????????????????????????????this._seperator = seperator;

????????????????????????????this._outFile???= outFile;

???????????????????????????

???????????????????}

??????????????????

???????????????????//把输出结果保存到外部文件里面。

???????????????????@Override

???????????????????public void prepare(Map stormConf, TopologyContext context,

?????????????????????????????????????OutputCollector collector) {

????????????????????????????// TODO Auto-generated method stub

????????????????????????????this._collector = collector;

????????????????????????????File out = new File(_outFile);

????????????????????????????try {

//??????????????????????????????????br = new BufferedWriter(new FileWriter(out));

?????????????????????????????????????bw = new BufferedWriter(new OutputStreamWriter(?

?????????????????????????????new FileOutputStream(out, true)));?

????????????????????????????} catch (IOException e1) {

?????????????????????????????????????// TODO Auto-generated catch block

?????????????????????????????????????e1.printStackTrace();

????????????????????????????}????????????????

???????????????????}

??????????????????

???????????????????//blot计算单元,把tuple中的数据添加一个bkeep和回车。然后保存到outfile指定的文件中。

???????????????????@Override

???????????????????public void execute(Tuple input) {

????????????????????????????// TODO Auto-generated method stub

????????????????????????????String line = input.getString(0);

//?????????????????????????System.out.println(line);

????????????????????????????String[] str = line.split(_seperator);

????????????????????????????System.out.println(str[2]);

????????????????????????????try {

?????????????????????????????????????bw.write(str[2]+",bkeep"+"\n");

?????????????????????????????????????bw.flush();

????????????????????????????} catch (IOException e) {

?????????????????????????????????????// TODO Auto-generated catch block

?????????????????????????????????????e.printStackTrace();

????????????????????????????}

???????????????????????????

????????????????????????????_collector.emit(new Values(line));

???????????????????}

?

???????????????????@Override

???????????????????public void declareOutputFields(OutputFieldsDeclarer declarer) {

????????????????????????????// TODO Auto-generated method stub

????????????????????????????declarer.declare(new Fields("line"));

???????????????????}

??????????????????

?????????}

????????

?????????public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{

???????????????????String dataFile = argv[0];?//输入文件

???????????????????String seperator = argv[1];??????//分隔符

???????????????????String outFile???= argv[2];?//输出文件

???????????????????boolean distribute = Boolean.valueOf(argv[3]);???????//本地模式还是集群模式

???????????????????TopologyBuilder builder = new TopologyBuilder();??//build一个topology

????????builder.setSpout("spout", new FileSpout(dataFile), 1);???//指定spout

????????builder.setBolt("bolt", new Process(seperator,outFile),1).shuffleGrouping("spout");??//指定bolt,包括bolt、process和grouping

????????Config conf = new Config();

????????if(distribute){

????????????StormSubmitter.submitTopology("LogProcess", conf, builder.createTopology());

????????}else{

?????????????????LocalCluster cluster = new LocalCluster();

?????????????????cluster.submitTopology("LogProcess", conf, builder.createTopology());

????????}

?????????}???????

}

?

?

运行

[admin@vkvm161064 guandao]$ pwd

/home/admin/guandao

[admin@vkvm161064 guandao]$ ls

out.txt??storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar??tmp.txt

?

输入文件:

[admin@vkvm161064 guandao]$ cat tmp.txt

a,b,c,d

1,2,3,4

A,B,C,D

xx,ff,ff,ss

xx,ff,alibaba,ss

xx,ff,taobao,ss

xx,xx,xx,xx

xx,xx,ll,xx

xx,xx,hero,xx

?

输出文件:

[admin@vkvm161064 guandao]$ cat out.txt

c,bkeep

3,bkeep

C,bkeep

ff,bkeep

alibaba,bkeep

taobao,bkeep

xx,bkeep

ll,bkeep

hero,bkeep

?

提交topology

[admin@vkvm161064 guandao]$ storm jar ./storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jarSHOT-jar-with-dependencies.jar mytest.LogProcess /home/admin/guandao/tmp.txt , /home/admin/guandao/out.txt fase

?

语法:storm??jar?自己开发的topology???topology_name??inputfile?分隔符???outputfile??true/false(true代表集群运行)

热点排行