HADOOP 处理 XML 样例
前几天去亿阳信通面试,被一个很胖的兄弟问了一个问题,不知道咋处理,回来特意研究了一下,希望能为其他兄弟提供帮助。
问题是,HADOOP如何来处理结构化数据,比如大量的XML
答案如下(非常遗憾,貌似只能在旧版本API上使用,即 org.apache.hadoop.mapred):
package com.liangc.hadoop.mr;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.lib.MultipleInputs;import org.apache.hadoop.streaming.StreamInputFormat;import org.apache.hadoop.streaming.StreamXmlRecordReader;public class XmlMR {public static class MyMap extends MapReduceBase implements Mapper<Text, Text, Text, Text> {@Overridepublic void map(Text key, Text value, OutputCollector<Text, Text> ctx, Reporter r) throws IOException {System.out.println("map :::::: "+key.toString());ctx.collect(key, key);}}public static class MyReduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {@Overridepublic void reduce(Text key, Iterator<Text> value, OutputCollector<Text, Text> ctx, Reporter r) throws IOException {StringBuffer sb = new StringBuffer();while(value.hasNext()){Text v = value.next();System.out.println("reduce :::::: "+v.toString());sb.append(v.toString());}ctx.collect(new Text(key.getLength()+""), new Text(sb.toString()));}}public static void main(String[] args) throws Exception {String input = args[0];String output = args[1];String begin = "<property>";String end = "</property>";JobConf conf = new JobConf(XmlMR.class);conf.setJobName("xmlMR");conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(Text.class);conf.setMapperClass(MyMap.class);conf.setReducerClass(MyReduce.class);conf.setInputFormat(StreamInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);MultipleInputs.addInputPath(conf, new Path(input), StreamInputFormat.class,MyMap.class);FileOutputFormat.setOutputPath(conf, new Path(output));conf.set("stream.recordreader.class", StreamXmlRecordReader.class.getName());conf.set("stream.recordreader.begin", begin );conf.set("stream.recordreader.end", end );JobClient.runJob(conf);}}